Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .stats.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
configured_endpoints: 21
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/anthropic-fd67aea6883f1ee9e46f31a42d3940f0acb1749e787055bd9b9f278b20fa53ec.yml
openapi_spec_url: https://storage.googleapis.com/stainless-sdk-openapi-specs/anthropic-75f0573c3d6d79650bcbd8b1b4fcf93ce146d567afeb1061cd4afccf8d1d6799.yml
4 changes: 2 additions & 2 deletions api.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Methods:
- <code title="get /v1/messages/batches">client.messages.batches.<a href="./src/anthropic/resources/messages/batches.py">list</a>(\*\*<a href="src/anthropic/types/messages/batch_list_params.py">params</a>) -> <a href="./src/anthropic/types/messages/message_batch.py">SyncPage[MessageBatch]</a></code>
- <code title="delete /v1/messages/batches/{message_batch_id}">client.messages.batches.<a href="./src/anthropic/resources/messages/batches.py">delete</a>(message_batch_id) -> <a href="./src/anthropic/types/messages/deleted_message_batch.py">DeletedMessageBatch</a></code>
- <code title="post /v1/messages/batches/{message_batch_id}/cancel">client.messages.batches.<a href="./src/anthropic/resources/messages/batches.py">cancel</a>(message_batch_id) -> <a href="./src/anthropic/types/messages/message_batch.py">MessageBatch</a></code>
- <code title="get /v1/messages/batches/{message_batch_id}/results">client.messages.batches.<a href="./src/anthropic/resources/messages/batches.py">results</a>(message_batch_id) -> BinaryAPIResponse</code>
- <code title="get /v1/messages/batches/{message_batch_id}/results">client.messages.batches.<a href="./src/anthropic/resources/messages/batches.py">results</a>(message_batch_id) -> <a href="./src/anthropic/types/messages/message_batch_individual_response.py">JSONLDecoder[MessageBatchIndividualResponse]</a></code>

# Models

Expand Down Expand Up @@ -217,4 +217,4 @@ Methods:
- <code title="get /v1/messages/batches?beta=true">client.beta.messages.batches.<a href="./src/anthropic/resources/beta/messages/batches.py">list</a>(\*\*<a href="src/anthropic/types/beta/messages/batch_list_params.py">params</a>) -> <a href="./src/anthropic/types/beta/messages/beta_message_batch.py">SyncPage[BetaMessageBatch]</a></code>
- <code title="delete /v1/messages/batches/{message_batch_id}?beta=true">client.beta.messages.batches.<a href="./src/anthropic/resources/beta/messages/batches.py">delete</a>(message_batch_id) -> <a href="./src/anthropic/types/beta/messages/beta_deleted_message_batch.py">BetaDeletedMessageBatch</a></code>
- <code title="post /v1/messages/batches/{message_batch_id}/cancel?beta=true">client.beta.messages.batches.<a href="./src/anthropic/resources/beta/messages/batches.py">cancel</a>(message_batch_id) -> <a href="./src/anthropic/types/beta/messages/beta_message_batch.py">BetaMessageBatch</a></code>
- <code title="get /v1/messages/batches/{message_batch_id}/results?beta=true">client.beta.messages.batches.<a href="./src/anthropic/resources/beta/messages/batches.py">results</a>(message_batch_id) -> BinaryAPIResponse</code>
- <code title="get /v1/messages/batches/{message_batch_id}/results?beta=true">client.beta.messages.batches.<a href="./src/anthropic/resources/beta/messages/batches.py">results</a>(message_batch_id) -> <a href="./src/anthropic/types/beta/messages/beta_message_batch_individual_response.py">JSONLDecoder[BetaMessageBatchIndividualResponse]</a></code>
101 changes: 101 additions & 0 deletions src/anthropic/_decoders/jsonl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from __future__ import annotations

import json
from typing_extensions import Generic, TypeVar, Iterator, AsyncIterator

import httpx

from .._models import construct_type_unchecked

_T = TypeVar("_T")


class JSONLDecoder(Generic[_T]):
"""A decoder for [JSON Lines](https://jsonlines.org) format.

This class provides an iterator over a byte-iterator that parses each JSON Line
into a given type.
"""

http_response: httpx.Response | None
"""The HTTP response this decoder was constructed from"""

def __init__(
self, *, raw_iterator: Iterator[bytes], line_type: type[_T], http_response: httpx.Response | None
) -> None:
super().__init__()
self.http_response = http_response
self._raw_iterator = raw_iterator
self._line_type = line_type
self._iterator = self.__decode__()

def __decode__(self) -> Iterator[_T]:
buf = b""
for chunk in self._raw_iterator:
for line in chunk.splitlines(keepends=True):
buf += line
if buf.endswith((b"\r", b"\n", b"\r\n")):
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
buf = b""

# flush
if buf:
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)

def __next__(self) -> _T:
return self._iterator.__next__()

def __iter__(self) -> Iterator[_T]:
for item in self._iterator:
yield item


class AsyncJSONLDecoder(Generic[_T]):
"""A decoder for [JSON Lines](https://jsonlines.org) format.

This class provides an async iterator over a byte-iterator that parses each JSON Line
into a given type.
"""

http_response: httpx.Response | None

def __init__(
self, *, raw_iterator: AsyncIterator[bytes], line_type: type[_T], http_response: httpx.Response | None
) -> None:
super().__init__()
self.http_response = http_response
self._raw_iterator = raw_iterator
self._line_type = line_type
self._iterator = self.__decode__()

async def __decode__(self) -> AsyncIterator[_T]:
buf = b""
async for chunk in self._raw_iterator:
for line in chunk.splitlines(keepends=True):
buf += line
if buf.endswith((b"\r", b"\n", b"\r\n")):
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
buf = b""

# flush
if buf:
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)

async def __anext__(self) -> _T:
return await self._iterator.__anext__()

async def __aiter__(self) -> AsyncIterator[_T]:
async for item in self._iterator:
yield item
22 changes: 22 additions & 0 deletions src/anthropic/_legacy_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ._constants import RAW_RESPONSE_HEADER
from ._streaming import Stream, AsyncStream, is_stream_class_type, extract_stream_chunk_type
from ._exceptions import APIResponseValidationError
from ._decoders.jsonl import JSONLDecoder, AsyncJSONLDecoder

if TYPE_CHECKING:
from ._models import FinalRequestOptions
Expand Down Expand Up @@ -204,6 +205,27 @@ def _parse(self, *, to: type[_T] | None = None) -> R | _T:

origin = get_origin(cast_to) or cast_to

if inspect.isclass(origin):
if issubclass(cast(Any, origin), JSONLDecoder):
return cast(
R,
cast("type[JSONLDecoder[Any]]", cast_to)(
raw_iterator=self.http_response.iter_bytes(chunk_size=4096),
line_type=extract_type_arg(cast_to, 0),
http_response=self.http_response,
),
)

if issubclass(cast(Any, origin), AsyncJSONLDecoder):
return cast(
R,
cast("type[AsyncJSONLDecoder[Any]]", cast_to)(
raw_iterator=self.http_response.aiter_bytes(chunk_size=4096),
line_type=extract_type_arg(cast_to, 0),
http_response=self.http_response,
),
)

if self._stream:
if to:
if not is_stream_class_type(to):
Expand Down
22 changes: 22 additions & 0 deletions src/anthropic/_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ._constants import RAW_RESPONSE_HEADER, OVERRIDE_CAST_TO_HEADER
from ._streaming import Stream, AsyncStream, is_stream_class_type, extract_stream_chunk_type
from ._exceptions import AnthropicError, APIResponseValidationError
from ._decoders.jsonl import JSONLDecoder, AsyncJSONLDecoder

if TYPE_CHECKING:
from ._models import FinalRequestOptions
Expand Down Expand Up @@ -138,6 +139,27 @@ def _parse(self, *, to: type[_T] | None = None) -> R | _T:

origin = get_origin(cast_to) or cast_to

if inspect.isclass(origin):
if issubclass(cast(Any, origin), JSONLDecoder):
return cast(
R,
cast("type[JSONLDecoder[Any]]", cast_to)(
raw_iterator=self.http_response.iter_bytes(chunk_size=4096),
line_type=extract_type_arg(cast_to, 0),
http_response=self.http_response,
),
)

if issubclass(cast(Any, origin), AsyncJSONLDecoder):
return cast(
R,
cast("type[AsyncJSONLDecoder[Any]]", cast_to)(
raw_iterator=self.http_response.aiter_bytes(chunk_size=4096),
line_type=extract_type_arg(cast_to, 0),
http_response=self.http_response,
),
)

if self._is_sse_stream:
if to:
if not is_stream_class_type(to):
Expand Down
41 changes: 15 additions & 26 deletions src/anthropic/resources/beta/messages/batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,15 @@
)
from ...._compat import cached_property
from ...._resource import SyncAPIResource, AsyncAPIResource
from ...._response import (
BinaryAPIResponse,
AsyncBinaryAPIResponse,
StreamedBinaryAPIResponse,
AsyncStreamedBinaryAPIResponse,
to_streamed_response_wrapper,
to_custom_raw_response_wrapper,
async_to_streamed_response_wrapper,
to_custom_streamed_response_wrapper,
async_to_custom_raw_response_wrapper,
async_to_custom_streamed_response_wrapper,
)
from ...._response import to_streamed_response_wrapper, async_to_streamed_response_wrapper
from ....pagination import SyncPage, AsyncPage
from ...._base_client import AsyncPaginator, make_request_options
from ...._decoders.jsonl import JSONLDecoder, AsyncJSONLDecoder
from ....types.beta.messages import batch_list_params, batch_create_params
from ....types.anthropic_beta_param import AnthropicBetaParam
from ....types.beta.messages.beta_message_batch import BetaMessageBatch
from ....types.beta.messages.beta_deleted_message_batch import BetaDeletedMessageBatch
from ....types.beta.messages.beta_message_batch_individual_response import BetaMessageBatchIndividualResponse

__all__ = ["Batches", "AsyncBatches"]

Expand Down Expand Up @@ -356,7 +347,7 @@ def results(
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
) -> BinaryAPIResponse:
) -> JSONLDecoder[BetaMessageBatchIndividualResponse]:
"""
Streams the results of a Message Batch as a `.jsonl` file.

Expand All @@ -379,7 +370,7 @@ def results(
"""
if not message_batch_id:
raise ValueError(f"Expected a non-empty value for `message_batch_id` but received {message_batch_id!r}")
extra_headers = {"Accept": "application/binary", **(extra_headers or {})}
extra_headers = {"Accept": "application/x-jsonl", **(extra_headers or {})}
extra_headers = {
**strip_not_given(
{
Expand All @@ -396,7 +387,8 @@ def results(
options=make_request_options(
extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
),
cast_to=BinaryAPIResponse,
cast_to=JSONLDecoder[BetaMessageBatchIndividualResponse],
stream=True,
)


Expand Down Expand Up @@ -717,7 +709,7 @@ async def results(
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
) -> AsyncBinaryAPIResponse:
) -> AsyncJSONLDecoder[BetaMessageBatchIndividualResponse]:
"""
Streams the results of a Message Batch as a `.jsonl` file.

Expand All @@ -740,7 +732,7 @@ async def results(
"""
if not message_batch_id:
raise ValueError(f"Expected a non-empty value for `message_batch_id` but received {message_batch_id!r}")
extra_headers = {"Accept": "application/binary", **(extra_headers or {})}
extra_headers = {"Accept": "application/x-jsonl", **(extra_headers or {})}
extra_headers = {
**strip_not_given(
{
Expand All @@ -757,7 +749,8 @@ async def results(
options=make_request_options(
extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
),
cast_to=AsyncBinaryAPIResponse,
cast_to=AsyncJSONLDecoder[BetaMessageBatchIndividualResponse],
stream=True,
)


Expand All @@ -780,9 +773,8 @@ def __init__(self, batches: Batches) -> None:
self.cancel = _legacy_response.to_raw_response_wrapper(
batches.cancel,
)
self.results = to_custom_raw_response_wrapper(
self.results = _legacy_response.to_raw_response_wrapper(
batches.results,
BinaryAPIResponse,
)


Expand All @@ -805,9 +797,8 @@ def __init__(self, batches: AsyncBatches) -> None:
self.cancel = _legacy_response.async_to_raw_response_wrapper(
batches.cancel,
)
self.results = async_to_custom_raw_response_wrapper(
self.results = _legacy_response.async_to_raw_response_wrapper(
batches.results,
AsyncBinaryAPIResponse,
)


Expand All @@ -830,9 +821,8 @@ def __init__(self, batches: Batches) -> None:
self.cancel = to_streamed_response_wrapper(
batches.cancel,
)
self.results = to_custom_streamed_response_wrapper(
self.results = to_streamed_response_wrapper(
batches.results,
StreamedBinaryAPIResponse,
)


Expand All @@ -855,7 +845,6 @@ def __init__(self, batches: AsyncBatches) -> None:
self.cancel = async_to_streamed_response_wrapper(
batches.cancel,
)
self.results = async_to_custom_streamed_response_wrapper(
self.results = async_to_streamed_response_wrapper(
batches.results,
AsyncStreamedBinaryAPIResponse,
)
Loading