from concurrent.futures import Executor
import datetime
import logging
from typing import (
TYPE_CHECKING,
Any,
AsyncIterable,
AsyncIterator,
Awaitable,
Callable,
Collection,
Coroutine,
Generator,
Generic,
Iterable,
Iterator,
List,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
overload,
)
from streamable._tools._async import AsyncFunction
from streamable._tools._iter import (
AsyncToSyncIterator,
SyncAsyncIterable,
)
from streamable._tools._logging import setup_logger
from streamable._tools._observation import Observation
from streamable._tools._validation import (
validate_concurrency_executor,
validate_positive_timedelta,
validate_int,
)
from streamable.visitors import Visitor
from streamable.visitors._iter import IteratorVisitor
from streamable.visitors._aiter import AsyncIteratorVisitor
from streamable.visitors._eq import EqualityVisitor
from streamable.visitors._repr import ReprVisitor
from streamable.visitors._involves_async import InvolvesAsyncVisitor
# Initialize "streamable" logger
setup_logger()
if TYPE_CHECKING: # pragma: no cover
import builtins
from typing_extensions import Concatenate, ParamSpec
P = ParamSpec("P")
U = TypeVar("U")
T = TypeVar("T")
V = TypeVar("V")
Exc = TypeVar("Exc", bound=Exception)
class stream(Iterable[T], AsyncIterable[T], Awaitable["stream[T]"]):
"""
``stream[T]`` wraps any ``Iterable[T]`` or ``AsyncIterable[T]`` with a lazy fluent interface covering concurrency, batching, buffering, rate limiting, progress logging, and error handling.
Chain lazy operations, source elements are processed on-the-fly during iteration.
Operations accept both sync and async functions, they can be mixed within the same ``stream``, that can then be consumed as an ``Iterable`` or ``AsyncIterable``. Async functions run in the current loop, one is created if needed.
Operations are implemented so that the iteration can resume after an exception.
Args:
source (``Iterable[T] | AsyncIterable[T] | Callable[[], T] | Callable[[], Coroutine[Any, Any, T]]``):
‣ ``Iterable[T] | AsyncIterable[T]``: A new iterator is created from this iterable for each stream iteration.
‣ ``Callable[[], T] | Callable[[], Coroutine[Any, Any, T]]``: Function called sequentially to get the next element.
Returns:
``stream[T]``: Stream wrapping the ``source``.
Example::
import logging
from datetime import timedelta
import httpx
from httpx import Response, HTTPStatusError
pokemons: stream[str] = (
stream(range(10))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.throttle(5, per=timedelta(seconds=1))
.map(httpx.get, concurrency=2)
.do(Response.raise_for_status)
.catch(HTTPStatusError, do=logging.warning)
.map(lambda poke: poke.json()["name"])
)
>>> list(pokemons)
['bulbasaur', 'ivysaur', 'venusaur', ...]
>>> [poke async for poke in pokemons]
['bulbasaur', 'ivysaur', 'venusaur', ...]
"""
__slots__ = ("_source", "_upstream")
# fmt: off
@overload
def __init__(self, source: Iterable[T]) -> None: ...
@overload
def __init__(self, source: AsyncIterable[T]) -> None: ...
@overload
def __init__(self, source: Callable[[], Coroutine[Any, Any, T]]) -> None: ...
@overload
def __init__(self, source: Callable[[], T]) -> None: ...
# fmt: on
def __init__(
self,
source: Union[
Iterable[T],
AsyncIterable[T],
Callable[[], Coroutine[Any, Any, T]],
Callable[[], T],
],
) -> None:
self._source = source
self._upstream: "Optional[stream]" = None
@property
def upstream(self) -> "Optional[stream]":
"""
The parent stream in the operation chain, if any.
Returns:
``stream | None``
"""
return self._upstream
@property
def source(self) -> Union[Iterable, AsyncIterable, Callable]:
"""
The source of elements wrapped by this stream.
Returns:
``Iterable | AsyncIterable | Callable``
"""
return self._source
def __iter__(self) -> Iterator[T]:
if self.accept(InvolvesAsyncVisitor()):
return AsyncToSyncIterator(self.__aiter__())
return self.accept(IteratorVisitor[T]())
def __aiter__(self) -> AsyncIterator[T]:
return self.accept(AsyncIteratorVisitor[T]())
def __eq__(self, other: Any) -> bool:
"""
Check if this stream is equal to another stream.
Two streams are equal if they apply the same operations with the same parameters to the same source.
Args:
other (``Any``): Object to compare with.
Returns:
``bool``
"""
return self.accept(EqualityVisitor(other))
def __repr__(self) -> str:
return self.accept(ReprVisitor())
def __call__(self) -> "stream[T]":
"""
Iterate until exhaustion without collecting elements.
Returns:
``stream[T]``: ``self``.
Example::
state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)
pipeline()
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
for _ in self:
pass
return self
def __await__(self) -> Generator[None, None, "stream[T]"]:
"""
Iterate as ``AsyncIterable`` until exhaustion without collecting elements.
Returns:
``stream[T]``: ``self``.
Example::
state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)
await pipeline
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
async def consume():
async for _ in self:
pass
yield from (consume().__await__())
return self
def __add__(
self, other: "Union[Iterable[U], AsyncIterable[U]]"
) -> "stream[Union[T, U]]":
"""
Concatenate a stream with an iterable.
Args:
other (``Iterable[U] | AsyncIterable[U]``): iterable to concatenate with this stream.
Returns:
``stream[T | U]``: Stream of ``self``'s elements followed by ``other``'s.
Example::
assert list(stream(range(10)) + range(10)) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
chain = cast("Iterable[stream[Union[T, U]]]", (self, other))
return cast("stream[Union[T, U]]", stream(chain).flatten())
[docs]
def accept(self, visitor: "Visitor[V]") -> V:
"""
Accept a visitor to traverse the stream's operation chain.
Args:
visitor (``Visitor[V]``): Visitor instance that will traverse the stream chain.
Returns:
``V``: Result of the visitor's traversal.
"""
return visitor.visit_stream(self)
[docs]
def pipe(
self,
into: "Callable[Concatenate[stream[T], P], U]",
*args: "P.args",
**kwargs: "P.kwargs",
) -> U:
"""
Apply a callable on the stream.
Args:
into (``Callable[Concatenate[stream[T], P], U]``): Function to call with the stream as first argument, followed by ``*args`` and ``**kwargs``.
*args: Positional arguments passed to ``into``.
**kwargs: Keyword arguments passed to ``into``.
Returns:
``U``: Result of ``into(self, *args, **kwargs)``.
Example::
import polars as pl
pokemons: stream[str] = ...
pokemons.pipe(pl.DataFrame, schema=["name"]).write_csv("pokemons.csv")
"""
return into(self, *args, **kwargs)
[docs]
def cast(self, into: Type[U]) -> "stream[U]":
"""
Cast the elements ``into`` a different type.
This is for type checkers only and has no impact on the iteration.
Args:
into (``type[U]``): Target type for stream elements.
Returns:
``stream[U]``: Upstream casted.
Example::
docs: stream[Any] = stream(['{"foo": "bar"}', '{"foo": "baz"}']).map(json.loads)
dicts: stream[dict[str, str]] = docs.cast(dict[str, str])
# the stream remains the same, it's for type checkers only
assert dicts is docs
"""
return cast(stream[U], self)
[docs]
def buffer(
self,
up_to: int,
) -> "stream[T]":
"""
Buffer upstream elements into a bounded queue (max size ``up_to``), via a background task.
Allow to decouple the upstream production rate from the downstream consumption rate.
The background task is a thread during a sync iteration, and an async task during an async iteration.
Args:
up_to (``int``): The buffer size. Must be >= 1. When reached, upstream pulling pauses until an element is yielded out of the buffer.
Returns:
``stream[T]``: Upstream with buffering.
Example::
pulled: list[int] = []
buffered_ints = iter(
stream(range(10))
.do(pulled.append)
.buffer(5)
)
assert next(buffered_ints) == 0
time.sleep(1e-3)
assert pulled == [0, 1, 2, 3, 4, 5]
"""
validate_int(up_to, gte=1, name="up_to")
return BufferStream(self, up_to)
@overload
def catch(
self,
errors: Union[Type[Exc], Tuple[Type[Exc], ...]],
*,
where: Union[None, Callable[[Exc], Any], AsyncFunction[Exc, Any]] = None,
do: Union[None, Callable[[Exc], Any], AsyncFunction[Exc, Any]] = None,
replace: AsyncFunction[Exc, U],
stop: bool = False,
) -> "stream[Union[T, U]]": ...
@overload
def catch(
self,
errors: Union[Type[Exc], Tuple[Type[Exc], ...]],
*,
where: Union[None, Callable[[Exc], Any], AsyncFunction[Exc, Any]] = None,
do: Union[None, Callable[[Exc], Any], AsyncFunction[Exc, Any]] = None,
replace: Callable[[Exc], U],
stop: bool = False,
) -> "stream[Union[T, U]]": ...
@overload
def catch(
self,
errors: Union[Type[Exc], Tuple[Type[Exc], ...]],
*,
where: Union[None, Callable[[Exc], Any], AsyncFunction[Exc, Any]] = None,
do: Union[None, Callable[[Exc], Any], AsyncFunction[Exc, Any]] = None,
stop: bool = False,
) -> "stream[T]": ...
[docs]
def catch(
self,
errors: Union[Type[Exc], Tuple[Type[Exc], ...]],
*,
where: Union[None, Callable[[Exc], Any], AsyncFunction[Exc, Any]] = None,
do: Union[None, Callable[[Exc], Any], AsyncFunction[Exc, Any]] = None,
replace: Union[None, Callable[[Exc], U], AsyncFunction[Exc, U]] = None,
stop: bool = False,
) -> "stream[Union[T, U]]":
"""
Catch and handle exceptions raised upstream.
An exception is caught if it is an instance of given ``errors`` and it satisfies the ``where`` predicate.
When an exception is caught: the ``do`` callback is called, followed by ``replace``, if provided.
Args:
errors (``type[Exception] | tuple[type[Exception], ...]``): Exception type(s) to catch.
where (``Callable[[Exception], Any] | AsyncFunction[Exception, Any] | None``, optional): Only exceptions for which ``where(exc)`` is truthy are caught.
do (``Callable[[Exception], Any] | AsyncFunction[Exception, Any] | None``, optional): ``do(exception)`` is called when an exception is caught.
replace (``Callable[[Exception], U] | AsyncFunction[Exception, U] | None``, optional): ``replace(exception)`` is yielded when an exception is caught.
stop (``bool``, optional): If ``True``, iteration stops when an exception is caught.
Returns:
``stream[T | U]``: Upstream with exception handling.
Example::
inverses: stream[float] = (
stream(range(10))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError)
)
assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
# `where` a predicate is satisfied
from http import HTTPStatus
urls = [
"https://github.com/ebonnal",
"https://github.com/ebonnal/streamable",
"https://github.com/ebonnal/foo",
]
responses: stream[httpx.Response] = (
stream(urls)
.map(httpx.get)
.do(httpx.Response.raise_for_status)
.catch(
httpx.HTTPStatusError,
where=lambda e: e.response.status_code == HTTPStatus.NOT_FOUND,
)
)
assert len(list(responses)) == 2
# `do` a side effect on catch
errors: list[Exception] = []
inverses: stream[float] = (
stream(range(10))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, do=errors.append)
)
assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
assert len(errors) == 1
# `replace` with a value
inverses: stream[float] = (
stream(range(10))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, replace=lambda e: float("inf"))
)
assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
# `stop=True` to stop the iteration if an exception is caught
inverses: stream[float] = (
stream(range(10))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, stop=True)
)
assert list(inverses) == []
"""
return CatchStream(
cast("stream[Union[T, U]]", self),
errors,
where=where,
replace=replace,
do=do,
stop=stop,
)
@overload
def do(
self,
effect: AsyncFunction[T, Any],
*,
concurrency: int = 1,
as_completed: bool = False,
) -> "stream[T]": ...
@overload
def do(
self,
effect: Callable[[T], Any],
*,
concurrency: Union[int, Executor] = 1,
as_completed: bool = False,
) -> "stream[T]": ...
[docs]
def do(
self,
effect: Union[Callable[[T], Any], AsyncFunction[T, Any]],
*,
concurrency: Union[int, Executor] = 1,
as_completed: bool = False,
) -> "stream[T]":
"""
Perform a side ``effect`` on each upstream element, yielding them unchanged.
Concurrency:
- Set the ``concurrency`` param to apply the effect concurrently.
- Only ``concurrency`` upstream elements are in-flight for processing.
- Preserve upstream order unless you set ``as_completed=True``.
Args:
effect (``Callable[[T], Any] | AsyncFunction[T, Any]``): The side effect to perform.
concurrency (``int | Executor``, optional): Concurrency control:
‣ ``1`` (default): Sequential processing.
‣ ``int > 1``: Concurrent via ``concurrency`` threads or async tasks if ``effect`` is async.
‣ ``Executor``: Uses the provided executor (e.g., a ``ProcessPoolExecutor``), the concurrency is the number of workers.
as_completed (``bool``, optional): Order of elements:
‣ ``False`` (default): Preserve upstream order.
‣ ``True``: Yield elements in the order their side effects complete.
Returns:
``stream[T]``: Upstream with side effects.
Example::
state: list[int] = []
storing_ints: stream[int] = stream(range(10)).do(state.append)
assert list(storing_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
if isinstance(concurrency, int):
validate_int(concurrency, gte=1, name="concurrency")
else:
validate_concurrency_executor(concurrency, effect, fn_name="effect")
return DoStream(self, effect, concurrency, as_completed)
[docs]
def filter(
self,
where: Union[Callable[[T], Any], AsyncFunction[T, Any]] = bool,
) -> "stream[T]":
"""
Filter elements ``where`` a predicate is truthy.
Args:
where (``Callable[[T], Any] | AsyncFunction[T, Any]``, optional): An element is kept if ``where(elem)`` is truthy.
Returns:
``stream[T]``: Filtered upstream.
Example::
even_ints: stream[int] = stream(range(10)).filter(lambda n: n % 2 == 0)
assert list(even_ints) == [0, 2, 4, 6, 8]
"""
return FilterStream(self, where)
@overload
def flatten(
self: "stream[Iterable[U]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[AsyncIterable[U]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[Iterator[U]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[AsyncIterator[U]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[stream[U]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[Collection[U]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[Sequence[U]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[List[U]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[Set[U]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[Tuple[U, ...]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[builtins.map[U]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[builtins.filter[U]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[range]",
*,
concurrency: int = 1,
) -> "stream[int]": ...
@overload
def flatten(
self: "stream[str]",
*,
concurrency: int = 1,
) -> "stream[str]": ...
@overload
def flatten(
self: "stream[SyncAsyncIterable[U]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[Union[Iterable[U], AsyncIterable[U]]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
@overload
def flatten(
self: "stream[Union[Iterator[U], AsyncIterator[U]]]",
*,
concurrency: int = 1,
) -> "stream[U]": ...
[docs]
def flatten(
self: "stream[Union[Iterable[U], AsyncIterable[U]]]",
*,
concurrency: int = 1,
) -> "stream[U]":
"""
Explode upstream elements, assumed to be ``Iterable`` or ``AsyncIterable``.
Args:
concurrency (``int``, optional): Concurrency control:
‣ ``1`` (default): Flatten each upstream iterable before moving on to the next one.
‣ ``int > 1``: Flatten ``concurrency`` upstream iterables at a time in a round-robin fashion (via threads, or via async tasks for ``AsyncIterable`` elements).
Returns:
``stream[U]``: Stream of all elements from upstream iterables.
Example::
chars: stream[str] = stream(["hel", "lo!"]).flatten()
assert list(chars) == ["h", "e", "l", "l", "o", "!"]
chars: stream[str] = stream(["hel", "lo", "!"]).flatten(concurrency=2)
assert list(chars) == ["h", "l", "e", "o", "l", "!"]
"""
validate_int(concurrency, gte=1, name="concurrency")
return FlattenStream(self, concurrency)
@overload
def group(
self,
up_to: Optional[int] = None,
*,
within: Optional[datetime.timedelta] = None,
by: AsyncFunction[T, U],
) -> "stream[Tuple[U, List[T]]]": ...
@overload
def group(
self,
up_to: Optional[int] = None,
*,
within: Optional[datetime.timedelta] = None,
by: Callable[[T], U],
) -> "stream[Tuple[U, List[T]]]": ...
@overload
def group(
self,
up_to: Optional[int] = None,
*,
within: Optional[datetime.timedelta] = None,
) -> "stream[List[T]]": ...
[docs]
def group(
self,
up_to: Optional[int] = None,
*,
within: Optional[datetime.timedelta] = None,
by: Union[None, Callable[[T], U], AsyncFunction[T, U]] = None,
) -> "Union[stream[List[T]], stream[Tuple[U, List[T]]]]":
"""
Group elements into batches:
- ``up_to`` a given batch size
- ``within`` a given time interval
- ``by`` a given key, yielding ``(key, elements)`` pairs
You can combine these parameters.
If an exception is encountered during grouping, the pending batch is yielded (all the pending batches if `by` is set), and then the exception is raised.
Args:
up_to (``int | None``, optional): If a batch reaches that size, it is yielded.
within (``timedelta | None``, optional): A batch pending for more than ``within`` is yielded, even if under ``up_to`` elements.
by (``Callable[[T], U] | AsyncFunction[T, U] | None``, optional): Co-group elements into ``(key, elements)`` tuples.
Returns:
``stream[list[T]]``, or ``stream[tuple[U, list[T]]]`` if ``by`` is set.
Example::
int_batches: stream[list[int]] = stream(range(10)).group(5)
assert list(int_batches) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
# `within` a given time interval
from datetime import timedelta
int_1s_batches: stream[list[int]] = (
stream(range(10))
.throttle(2, per=timedelta(seconds=1))
.group(within=timedelta(seconds=0.99))
)
assert list(int_1s_batches) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
# `by` a given key, yielding `(key, elements)` pairs
ints_by_parity: stream[tuple[str, list[int]]] = (
stream(range(10))
.group(by=lambda n: "odd" if n % 2 else "even")
)
assert list(ints_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]
"""
if up_to is not None:
validate_int(up_to, gte=1, name="up_to")
if within is not None:
validate_positive_timedelta(within, name="within")
return GroupStream(self, up_to, within, by)
@overload
def map(
self,
into: AsyncFunction[T, U],
*,
concurrency: int = 1,
as_completed: bool = False,
) -> "stream[U]": ...
@overload
def map(
self,
into: Callable[[T], U],
*,
concurrency: Union[int, Executor] = 1,
as_completed: bool = False,
) -> "stream[U]": ...
[docs]
def map(
self,
into: Union[Callable[[T], U], AsyncFunction[T, U]],
*,
concurrency: Union[int, Executor] = 1,
as_completed: bool = False,
) -> "stream[U]":
"""
Transform upstream elements.
Concurrency:
- Set the ``concurrency`` param to apply the transformation concurrently.
- Only ``concurrency`` upstream elements are in-flight for processing.
- Preserve upstream order unless you set ``as_completed=True``.
Args:
into (``Callable[[T], U] | AsyncFunction[T, U]``): The transformation.
concurrency (``int | Executor``, optional): Concurrency control:
‣ ``1`` (default): Sequential processing.
‣ ``int > 1``: Concurrent via ``concurrency`` threads or async tasks if ``into`` is async.
‣ ``Executor``: Uses the provided executor (e.g., a ``ProcessPoolExecutor``), the concurrency is the number of workers.
as_completed (``bool``, optional): Order of results:
‣ ``False`` (default): Preserve upstream order.
‣ ``True``: Yield results as they become available.
Returns:
``stream[U]``: Mapped upstream.
Example::
int_chars: stream[str] = stream(range(10)).map(str)
assert list(int_chars) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
# concurrency via threads
pokemons: stream[str] = (
stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.map(httpx.get, concurrency=2)
.map(lambda poke: poke.json()["name"])
)
assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']
# concurrency via `async` coroutines within async context
async with httpx.AsyncClient() as http_client:
pokemons: stream[str] = (
stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.map(http_client.get, concurrency=2)
.map(lambda poke: poke.json()["name"])
)
assert [name async for name in pokemons] == ['bulbasaur', 'ivysaur', 'venusaur']
# concurrency via `async` coroutines within sync context
with asyncio.Runner() as runner:
http_client = httpx.AsyncClient()
pokemons: stream[str] = (
stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.map(http_client.get, concurrency=2)
.map(lambda poke: poke.json()["name"])
)
# uses runner's loop
assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']
runner.run(http_client.aclose())
# concurrency via processes
with ProcessPoolExecutor(max_workers=10) as processes:
state: list[int] = []
# ints are mapped
assert list(
stream(range(10))
.map(state.append, concurrency=processes)
) == [None] * 10
# the `state` of the main process is not mutated
assert state == []
# starmap
from streamable import star
enumerated_pokes: stream[str] = (
stream(enumerate(pokemons))
.map(star(lambda index, poke: f"#{index + 1} {poke}"))
)
assert list(enumerated_pokes) == ['#1 bulbasaur', '#2 ivysaur', '#3 venusaur', '#4 charmander', '#5 charmeleon', '#6 charizard', '#7 squirtle', '#8 wartortle', '#9 blastoise']
"""
if isinstance(concurrency, int):
validate_int(concurrency, gte=1, name="concurrency")
else:
validate_concurrency_executor(concurrency, into, fn_name="into")
return MapStream(self, into, concurrency, as_completed)
[docs]
def observe(
self,
subject: str = "elements",
*,
every: Union[None, int, datetime.timedelta] = None,
do: Union[
Callable[[Observation], Any],
AsyncFunction[Observation, Any],
] = logging.getLogger("streamable").info,
) -> "stream[T]":
"""
Observe the iteration progress:
- elapsed time since the iteration started
- number of elements yielded upstream
- number of errors raised upstream
A ``streamable.Observation`` is passed to the ``do`` callback (the default emits a log), at a frequency defined by ``every``.
If an error is raised by ``do``, it is silently ignored and is not included in the errors count.
Args:
subject (``str``, optional): Description of elements being observed.
every (``int | timedelta | None``, optional): When to emit observations:
‣ ``None`` (default): When the elements/errors counts reach powers of 2.
‣ ``int``: Periodically when ``every`` elements or errors have been emitted.
‣ ``timedelta``: Periodically ``every`` time interval.
do (``Callable[[streamable.Observation], Any] | AsyncFunction[streamable.Observation, Any]``, optional): Callback receiving a ``streamable.Observation`` (subject, elapsed, errors, elements).
Returns:
``stream[T]``: Stream with progress observed.
Example::
observed_ints: stream[int] = stream(range(10)).observe("ints")
assert list(observed_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# observe every 1k elements (or errors)
observed_ints = stream(range(10)).observe("ints", every=1000)
# observe every 5 seconds
observed_ints = stream(range(10)).observe("ints", every=timedelta(seconds=5))
observed_ints = stream(range(10)).observe("ints", do=other_logger.info)
observed_ints = stream(range(10)).observe("ints", do=logs.append)
observed_ints = stream(range(10)).observe("ints", do=print)
"""
if isinstance(every, int):
validate_int(every, gte=1, name="every")
elif isinstance(every, datetime.timedelta):
validate_positive_timedelta(every, name="every")
return ObserveStream(self, subject, every, do)
@overload
def skip(self, until: int) -> "stream[T]": ...
@overload
def skip(
self, *, until: Union[Callable[[T], Any], AsyncFunction[T, Any]]
) -> "stream[T]": ...
[docs]
def skip(
self,
until: Union[int, Callable[[T], Any], AsyncFunction[T, Any]],
) -> "stream[T]":
"""
Skip a given number of elements, or skip ``until`` a predicate is satisfied.
Args:
until (``int | Callable[[T], Any] | AsyncFunction[T, Any]``): Skip control:
‣ ``int``: Skip first ``until`` elements. Must be >= 0.
‣ ``Callable[[T], Any] | AsyncFunction[T, Any]``: Skip until ``until(elem)`` is truthy, then yield that element and all subsequent.
Returns:
``stream[T]``: Stream of remaining upstream elements after skipping.
Example::
ints_after_5: stream[int] = stream(range(10)).skip(5)
assert list(ints_after_5) == [5, 6, 7, 8, 9]
ints_after_5: stream[int] = stream(range(10)).skip(until=lambda n: n >= 5)
assert list(ints_after_5) == [5, 6, 7, 8, 9]
"""
if isinstance(until, int):
validate_int(until, gte=0, name="until")
return SkipStream(self, until)
@overload
def take(self, until: int) -> "stream[T]": ...
@overload
def take(
self, until: Union[Callable[[T], Any], AsyncFunction[T, Any]]
) -> "stream[T]": ...
[docs]
def take(
self,
until: Union[int, Callable[[T], Any], AsyncFunction[T, Any]],
) -> "stream[T]":
"""
Take a given number of elements, or take ``until`` a predicate is satisfied, remaining upstream elements are not consumed.
Args:
until (``int | Callable[[T], Any] | AsyncFunction[T, Any]``): Stop control:
‣ ``int``: Take first ``until`` elements. Must be >= 0.
‣ ``Callable[[T], Any] | AsyncFunction[T, Any]``: Take until ``until(elem)`` is truthy, then stop. Matching element is not yielded.
Returns:
``stream[T]``: Stream of taken upstream elements.
Example::
first_5_ints: stream[int] = stream(range(10)).take(5)
assert list(first_5_ints) == [0, 1, 2, 3, 4]
first_5_ints: stream[int] = stream(range(10)).take(until=lambda n: n == 5)
assert list(first_5_ints) == [0, 1, 2, 3, 4]
"""
if isinstance(until, int):
validate_int(until, gte=0, name="until")
return TakeStream(self, until)
[docs]
def throttle(
self,
up_to: int,
*,
per: datetime.timedelta,
) -> "stream[T]":
"""
Limit to ``up_to`` emissions (elements or exceptions) ``per`` time window.
For each new upstream element or exception, if fewer than ``up_to`` were emitted in the last ``per`` interval, emits it immediately; otherwise sleeps until oldest emission leaves the sliding window.
Args:
up_to (``int``): Maximum emissions allowed in the sliding window (``per``). Must be >= 1.
per (``timedelta``): Duration of the sliding time window. Must be positive.
Returns:
``stream[T]``: Stream with rate limiting.
Example::
from datetime import timedelta
throttled_ints: stream[int] = stream(range(10)).throttle(3, per=timedelta(seconds=1))
# takes 3 seconds
assert list(throttled_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
validate_int(up_to, gte=1, name="up_to")
validate_positive_timedelta(per, name="per")
return ThrottleStream(self, up_to, per)
class DownStream(stream[U], Generic[T, U]):
"""
A stream having an upstream.
"""
__slots__ = ()
def __init__(self, upstream: stream[T]) -> None:
self._upstream: stream[T] = upstream
@property
def source(self) -> Union[Iterable, AsyncIterable, Callable]:
return self._upstream.source
@property
def upstream(self) -> stream[T]:
"""
The parent stream that this downstream stream operates on.
Returns:
``stream[T]``: Upstream stream in the operation chain.
"""
return self._upstream
class BufferStream(DownStream[T, T]):
__slots__ = ("_up_to",)
def __init__(
self,
upstream: stream[T],
up_to: int,
) -> None:
super().__init__(upstream)
self._up_to = up_to
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_buffer_stream(self)
class CatchStream(DownStream[T, Union[T, U]]):
__slots__ = ("_errors", "_where", "_replace", "_do", "_stop")
def __init__(
self,
upstream: stream[T],
errors: Union[Type[Exc], Tuple[Type[Exc], ...]],
where: Union[
None,
Callable[[Exc], Any],
AsyncFunction[Exc, Any],
],
replace: Union[
None,
Callable[[Exc], U],
AsyncFunction[Exc, U],
],
do: Union[
None,
Callable[[Exc], Any],
AsyncFunction[Exc, Any],
],
stop: bool,
) -> None:
super().__init__(upstream)
self._errors = errors
self._where = where
self._replace = replace
self._do = do
self._stop = stop
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_catch_stream(self)
class DoStream(DownStream[T, T]):
__slots__ = ("_effect", "_concurrency", "_as_completed")
def __init__(
self,
upstream: stream[T],
effect: Union[
Callable[[T], Any],
AsyncFunction[T, Any],
],
concurrency: Union[int, Executor],
as_completed: bool,
) -> None:
super().__init__(upstream)
self._effect = effect
self._concurrency = concurrency
self._as_completed = as_completed
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_do_stream(self)
class FilterStream(DownStream[T, T]):
__slots__ = ("_where",)
def __init__(self, upstream: stream[T], where: Callable[[T], Any]) -> None:
super().__init__(upstream)
self._where = where
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_filter_stream(self)
class FlattenStream(DownStream[Union[Iterable[T], AsyncIterable[T]], T]):
__slots__ = ("_concurrency",)
def __init__(
self,
upstream: stream[Union[Iterable[T], AsyncIterable[T]]],
concurrency: int,
) -> None:
super().__init__(upstream)
self._concurrency = concurrency
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_flatten_stream(self)
class GroupStream(DownStream[T, List[T]]):
__slots__ = ("_up_to", "_within", "_by")
def __init__(
self,
upstream: stream[T],
up_to: Optional[int],
within: Optional[datetime.timedelta],
by: Union[
None,
Callable[[T], Any],
AsyncFunction[T, Any],
],
) -> None:
super().__init__(upstream)
self._up_to = up_to
self._within = within
self._by = by
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_group_stream(self)
class MapStream(DownStream[T, U]):
__slots__ = ("_into", "_concurrency", "_as_completed")
def __init__(
self,
upstream: stream[T],
into: Union[
Callable[[T], U],
AsyncFunction[T, U],
],
concurrency: Union[int, Executor],
as_completed: bool,
) -> None:
super().__init__(upstream)
self._into = into
self._concurrency = concurrency
self._as_completed = as_completed
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_map_stream(self)
class ObserveStream(DownStream[T, T]):
__slots__ = ("_subject", "_every", "_do")
def __init__(
self,
upstream: stream[T],
subject: str,
every: Union[None, int, datetime.timedelta],
do: Union[
Callable[[Observation], Any],
AsyncFunction[Observation, Any],
],
) -> None:
super().__init__(upstream)
self._subject = subject
self._every = every
self._do = do
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_observe_stream(self)
class SkipStream(DownStream[T, T]):
__slots__ = "_until"
def __init__(
self,
upstream: stream[T],
until: Union[
int,
Callable[[T], Any],
AsyncFunction[T, Any],
],
) -> None:
super().__init__(upstream)
self._until = until
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_skip_stream(self)
class TakeStream(DownStream[T, T]):
__slots__ = "_until"
def __init__(
self,
upstream: stream[T],
until: Union[
int,
Callable[[T], Any],
AsyncFunction[T, Any],
],
) -> None:
super().__init__(upstream)
self._until = until
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_take_stream(self)
class ThrottleStream(DownStream[T, T]):
__slots__ = ("_up_to", "_per")
def __init__(
self,
upstream: stream[T],
up_to: int,
per: datetime.timedelta,
) -> None:
super().__init__(upstream)
self._up_to = up_to
self._per = per
def accept(self, visitor: "Visitor[V]") -> V:
return visitor.visit_throttle_stream(self)