API Reference¶
༄ streamable¶
- class streamable.stream( )¶
stream[T]wraps anyIterable[T]orAsyncIterable[T]with a lazy fluent interface covering concurrency, batching, buffering, rate limiting, progress logging, and error handling.Chain lazy operations, source elements are processed on-the-fly during iteration.
Operations accept both sync and async functions, they can be mixed within the same
stream, that can then be consumed as anIterableorAsyncIterable. Async functions run in the current loop, one is created if needed.Operations are implemented so that the iteration can resume after an exception.
- Parameters:
source (
Iterable[T] | AsyncIterable[T] | Callable[[], T] | Callable[[], Coroutine[Any, Any, T]]) –Iterable[T] | AsyncIterable[T]: A new iterator is created from this iterable for each stream iteration.Callable[[], T] | Callable[[], Coroutine[Any, Any, T]]: Function called sequentially to get the next element.
- Returns:
Stream wrapping the
source.- Return type:
stream[T]
Example:
import logging from datetime import timedelta import httpx from httpx import Response, HTTPStatusError pokemons: stream[str] = ( stream(range(10)) .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}") .throttle(5, per=timedelta(seconds=1)) .map(httpx.get, concurrency=2) .do(Response.raise_for_status) .catch(HTTPStatusError, do=logging.warning) .map(lambda poke: poke.json()["name"]) ) >>> list(pokemons) ['bulbasaur', 'ivysaur', 'venusaur', ...] >>> [poke async for poke in pokemons] ['bulbasaur', 'ivysaur', 'venusaur', ...]
- accept(
- visitor: Visitor[V],
Accept a visitor to traverse the stream’s operation chain.
- Parameters:
visitor (
Visitor[V]) – Visitor instance that will traverse the stream chain.- Returns:
Result of the visitor’s traversal.
- Return type:
V
- buffer(
- up_to: int,
Buffer upstream elements into a bounded queue (max size
up_to), via a background task.Allow to decouple the upstream production rate from the downstream consumption rate.
The background task is a thread during a sync iteration, and an async task during an async iteration.
- Parameters:
up_to (
int) – The buffer size. Must be >= 1. When reached, upstream pulling pauses until an element is yielded out of the buffer.- Returns:
Upstream with buffering.
- Return type:
stream[T]
Example:
pulled: list[int] = [] buffered_ints = iter( stream(range(10)) .do(pulled.append) .buffer(5) ) assert next(buffered_ints) == 0 time.sleep(1e-3) assert pulled == [0, 1, 2, 3, 4, 5]
- cast(
- into: Type[U],
Cast the elements
intoa different type.This is for type checkers only and has no impact on the iteration.
- Parameters:
into (
type[U]) – Target type for stream elements.- Returns:
Upstream casted.
- Return type:
stream[U]
Example:
docs: stream[Any] = stream(['{"foo": "bar"}', '{"foo": "baz"}']).map(json.loads) dicts: stream[dict[str, str]] = docs.cast(dict[str, str]) # the stream remains the same, it's for type checkers only assert dicts is docs
- catch(
- errors: Type[Exc] | Tuple[Type[Exc], ...],
- *,
- where: None | Callable[[Exc], Any] | Callable[[Exc], Coroutine[Any, Any, Any]] = None,
- do: None | Callable[[Exc], Any] | Callable[[Exc], Coroutine[Any, Any, Any]] = None,
- replace: None | Callable[[Exc], U] | Callable[[Exc], Coroutine[Any, Any, U]] = None,
- stop: bool = False,
Catch and handle exceptions raised upstream.
An exception is caught if it is an instance of given
errorsand it satisfies thewherepredicate.When an exception is caught: the
docallback is called, followed byreplace, if provided.- Parameters:
errors (
type[Exception] | tuple[type[Exception], ...]) – Exception type(s) to catch.where (
Callable[[Exception], Any] | AsyncFunction[Exception, Any] | None, optional) – Only exceptions for whichwhere(exc)is truthy are caught.do (
Callable[[Exception], Any] | AsyncFunction[Exception, Any] | None, optional) –do(exception)is called when an exception is caught.replace (
Callable[[Exception], U] | AsyncFunction[Exception, U] | None, optional) –replace(exception)is yielded when an exception is caught.stop (
bool, optional) – IfTrue, iteration stops when an exception is caught.
- Returns:
Upstream with exception handling.
- Return type:
stream[T | U]
Example:
inverses: stream[float] = ( stream(range(10)) .map(lambda n: round(1 / n, 2)) .catch(ZeroDivisionError) ) assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11] # `where` a predicate is satisfied from http import HTTPStatus urls = [ "https://github.com/ebonnal", "https://github.com/ebonnal/streamable", "https://github.com/ebonnal/foo", ] responses: stream[httpx.Response] = ( stream(urls) .map(httpx.get) .do(httpx.Response.raise_for_status) .catch( httpx.HTTPStatusError, where=lambda e: e.response.status_code == HTTPStatus.NOT_FOUND, ) ) assert len(list(responses)) == 2 # `do` a side effect on catch errors: list[Exception] = [] inverses: stream[float] = ( stream(range(10)) .map(lambda n: round(1 / n, 2)) .catch(ZeroDivisionError, do=errors.append) ) assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11] assert len(errors) == 1 # `replace` with a value inverses: stream[float] = ( stream(range(10)) .map(lambda n: round(1 / n, 2)) .catch(ZeroDivisionError, replace=lambda e: float("inf")) ) assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11] # `stop=True` to stop the iteration if an exception is caught inverses: stream[float] = ( stream(range(10)) .map(lambda n: round(1 / n, 2)) .catch(ZeroDivisionError, stop=True) ) assert list(inverses) == []
- do(
- effect: Callable[[T], Any] | Callable[[T], Coroutine[Any, Any, Any]],
- *,
- concurrency: int | Executor = 1,
- as_completed: bool = False,
Perform a side
effecton each upstream element, yielding them unchanged.Concurrency:
Set the
concurrencyparam to apply the effect concurrently.Only
concurrencyupstream elements are in-flight for processing.Preserve upstream order unless you set
as_completed=True.
- Parameters:
effect (
Callable[[T], Any] | AsyncFunction[T, Any]) – The side effect to perform.concurrency (
int | Executor, optional) –Concurrency control:
1(default): Sequential processing.int > 1: Concurrent viaconcurrencythreads or async tasks ifeffectis async.Executor: Uses the provided executor (e.g., aProcessPoolExecutor), the concurrency is the number of workers.
as_completed (
bool, optional) –Order of elements:
False(default): Preserve upstream order.True: Yield elements in the order their side effects complete.
- Returns:
Upstream with side effects.
- Return type:
stream[T]
Example:
state: list[int] = [] storing_ints: stream[int] = stream(range(10)).do(state.append) assert list(storing_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- filter(where: ~typing.Callable[[~streamable._stream.T], ~typing.Any] | ~typing.Callable[[~streamable._stream.T], ~typing.Coroutine[~typing.Any, ~typing.Any, ~typing.Any]] = <class 'bool'>)[source]¶
Filter elements
wherea predicate is truthy.- Parameters:
where (
Callable[[T], Any] | AsyncFunction[T, Any], optional) – An element is kept ifwhere(elem)is truthy.- Returns:
Filtered upstream.
- Return type:
stream[T]
Example:
even_ints: stream[int] = stream(range(10)).filter(lambda n: n % 2 == 0) assert list(even_ints) == [0, 2, 4, 6, 8]
- flatten(
- *,
- concurrency: int = 1,
Explode upstream elements, assumed to be
IterableorAsyncIterable.- Parameters:
concurrency (
int, optional) –Concurrency control:
1(default): Flatten each upstream iterable before moving on to the next one.int > 1: Flattenconcurrencyupstream iterables at a time in a round-robin fashion (via threads, or via async tasks forAsyncIterableelements).
- Returns:
Stream of all elements from upstream iterables.
- Return type:
stream[U]
Example:
chars: stream[str] = stream(["hel", "lo!"]).flatten() assert list(chars) == ["h", "e", "l", "l", "o", "!"] chars: stream[str] = stream(["hel", "lo", "!"]).flatten(concurrency=2) assert list(chars) == ["h", "l", "e", "o", "l", "!"]
- group(
- up_to: int | None = None,
- *,
- within: timedelta | None = None,
- by: None | Callable[[T], U] | Callable[[T], Coroutine[Any, Any, U]] = None,
Group elements into batches:
up_toa given batch sizewithina given time intervalbya given key, yielding(key, elements)pairs
You can combine these parameters.
If an exception is encountered during grouping, the pending batch is yielded (all the pending batches if by is set), and then the exception is raised.
- Parameters:
up_to (
int | None, optional) – If a batch reaches that size, it is yielded.within (
timedelta | None, optional) – A batch pending for more thanwithinis yielded, even if underup_toelements.by (
Callable[[T], U] | AsyncFunction[T, U] | None, optional) – Co-group elements into(key, elements)tuples.
- Return type:
Union[stream[List[TypeVar(T)]],stream[Tuple[TypeVar(U),List[TypeVar(T)]]]]- Returns:
stream[list[T]], orstream[tuple[U, list[T]]]ifbyis set.
Example:
int_batches: stream[list[int]] = stream(range(10)).group(5) assert list(int_batches) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]] # `within` a given time interval from datetime import timedelta int_1s_batches: stream[list[int]] = ( stream(range(10)) .throttle(2, per=timedelta(seconds=1)) .group(within=timedelta(seconds=0.99)) ) assert list(int_1s_batches) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]] # `by` a given key, yielding `(key, elements)` pairs ints_by_parity: stream[tuple[str, list[int]]] = ( stream(range(10)) .group(by=lambda n: "odd" if n % 2 else "even") ) assert list(ints_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]
- map(
- into: Callable[[T], U] | Callable[[T], Coroutine[Any, Any, U]],
- *,
- concurrency: int | Executor = 1,
- as_completed: bool = False,
Transform upstream elements.
Concurrency:
Set the
concurrencyparam to apply the transformation concurrently.Only
concurrencyupstream elements are in-flight for processing.Preserve upstream order unless you set
as_completed=True.
- Parameters:
into (
Callable[[T], U] | AsyncFunction[T, U]) – The transformation.concurrency (
int | Executor, optional) –Concurrency control:
1(default): Sequential processing.int > 1: Concurrent viaconcurrencythreads or async tasks ifintois async.Executor: Uses the provided executor (e.g., aProcessPoolExecutor), the concurrency is the number of workers.
as_completed (
bool, optional) –Order of results:
False(default): Preserve upstream order.True: Yield results as they become available.
- Returns:
Mapped upstream.
- Return type:
stream[U]
Example:
int_chars: stream[str] = stream(range(10)).map(str) assert list(int_chars) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] # concurrency via threads pokemons: stream[str] = ( stream(range(1, 4)) .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}") .map(httpx.get, concurrency=2) .map(lambda poke: poke.json()["name"]) ) assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur'] # concurrency via `async` coroutines within async context async with httpx.AsyncClient() as http_client: pokemons: stream[str] = ( stream(range(1, 4)) .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}") .map(http_client.get, concurrency=2) .map(lambda poke: poke.json()["name"]) ) assert [name async for name in pokemons] == ['bulbasaur', 'ivysaur', 'venusaur'] # concurrency via `async` coroutines within sync context with asyncio.Runner() as runner: http_client = httpx.AsyncClient() pokemons: stream[str] = ( stream(range(1, 4)) .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}") .map(http_client.get, concurrency=2) .map(lambda poke: poke.json()["name"]) ) # uses runner's loop assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur'] runner.run(http_client.aclose()) # concurrency via processes with ProcessPoolExecutor(max_workers=10) as processes: state: list[int] = [] # ints are mapped assert list( stream(range(10)) .map(state.append, concurrency=processes) ) == [None] * 10 # the `state` of the main process is not mutated assert state == [] # starmap from streamable import star enumerated_pokes: stream[str] = ( stream(enumerate(pokemons)) .map(star(lambda index, poke: f"#{index + 1} {poke}")) ) assert list(enumerated_pokes) == ['#1 bulbasaur', '#2 ivysaur', '#3 venusaur', '#4 charmander', '#5 charmeleon', '#6 charizard', '#7 squirtle', '#8 wartortle', '#9 blastoise']
- observe(subject: str = 'elements', *, every: None | int | ~datetime.timedelta = None, do: ~typing.Callable[[~streamable.Observation], ~typing.Any] | ~typing.Callable[[~streamable.Observation], ~typing.Coroutine[~typing.Any, ~typing.Any, ~typing.Any]] = <bound method Logger.info of <Logger streamable (INFO)>>)[source]¶
Observe the iteration progress:
elapsed time since the iteration started
number of elements yielded upstream
number of errors raised upstream
A
streamable.Observationis passed to thedocallback (the default emits a log), at a frequency defined byevery.If an error is raised by
do, it is silently ignored and is not included in the errors count.- Parameters:
subject (
str, optional) – Description of elements being observed.every (
int | timedelta | None, optional) –When to emit observations:
None(default): When the elements/errors counts reach powers of 2.int: Periodically wheneveryelements or errors have been emitted.timedelta: Periodicallyeverytime interval.
do (
Callable[[streamable.Observation], Any] | AsyncFunction[streamable.Observation, Any], optional) – Callback receiving astreamable.Observation(subject, elapsed, errors, elements).
- Returns:
Stream with progress observed.
- Return type:
stream[T]
Example:
observed_ints: stream[int] = stream(range(10)).observe("ints") assert list(observed_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] # observe every 1k elements (or errors) observed_ints = stream(range(10)).observe("ints", every=1000) # observe every 5 seconds observed_ints = stream(range(10)).observe("ints", every=timedelta(seconds=5)) observed_ints = stream(range(10)).observe("ints", do=other_logger.info) observed_ints = stream(range(10)).observe("ints", do=logs.append) observed_ints = stream(range(10)).observe("ints", do=print)
- pipe(into: ~typing.Callable[[~typing.Concatenate[~streamable.stream[~streamable._stream.T], ~P]], ~streamable._stream.U], *args: ~typing.~P, **kwargs: ~typing.~P)[source]¶
Apply a callable on the stream.
- Parameters:
into (
Callable[Concatenate[stream[T], P], U]) – Function to call with the stream as first argument, followed by*argsand**kwargs.*args (
ParamSpecArgs) – Positional arguments passed tointo.**kwargs (
ParamSpecKwargs) – Keyword arguments passed tointo.
- Returns:
Result of
into(self, *args, **kwargs).- Return type:
U
Example:
import polars as pl pokemons: stream[str] = ... pokemons.pipe(pl.DataFrame, schema=["name"]).write_csv("pokemons.csv")
- skip( )[source]¶
Skip a given number of elements, or skip
untila predicate is satisfied.- Parameters:
until (
int | Callable[[T], Any] | AsyncFunction[T, Any]) –Skip control:
int: Skip firstuntilelements. Must be >= 0.Callable[[T], Any] | AsyncFunction[T, Any]: Skip untiluntil(elem)is truthy, then yield that element and all subsequent.
- Returns:
Stream of remaining upstream elements after skipping.
- Return type:
stream[T]
Example:
ints_after_5: stream[int] = stream(range(10)).skip(5) assert list(ints_after_5) == [5, 6, 7, 8, 9] ints_after_5: stream[int] = stream(range(10)).skip(until=lambda n: n >= 5) assert list(ints_after_5) == [5, 6, 7, 8, 9]
- take( )[source]¶
Take a given number of elements, or take
untila predicate is satisfied, remaining upstream elements are not consumed.- Parameters:
until (
int | Callable[[T], Any] | AsyncFunction[T, Any]) –Stop control:
int: Take firstuntilelements. Must be >= 0.Callable[[T], Any] | AsyncFunction[T, Any]: Take untiluntil(elem)is truthy, then stop. Matching element is not yielded.
- Returns:
Stream of taken upstream elements.
- Return type:
stream[T]
Example:
first_5_ints: stream[int] = stream(range(10)).take(5) assert list(first_5_ints) == [0, 1, 2, 3, 4] first_5_ints: stream[int] = stream(range(10)).take(until=lambda n: n == 5) assert list(first_5_ints) == [0, 1, 2, 3, 4]
- throttle( )[source]¶
Limit to
up_toemissions (elements or exceptions)pertime window.For each new upstream element or exception, if fewer than
up_towere emitted in the lastperinterval, emits it immediately; otherwise sleeps until oldest emission leaves the sliding window.- Parameters:
up_to (
int) – Maximum emissions allowed in the sliding window (per). Must be >= 1.per (
timedelta) – Duration of the sliding time window. Must be positive.
- Returns:
Stream with rate limiting.
- Return type:
stream[T]
Example:
from datetime import timedelta throttled_ints: stream[int] = stream(range(10)).throttle(3, per=timedelta(seconds=1)) # takes 3 seconds assert list(throttled_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- property source: Iterable | AsyncIterable | Callable¶
The source of elements wrapped by this stream.
- Returns:
Iterable | AsyncIterable | Callable
- streamable.star(
- func: Callable[[...], R],
Transform a function (sync or async) that takes several positional arguments into a function that takes a tuple.
@star def add(a: int, b: int) -> int: return a + b assert add((2, 5)) == 7 assert star(lambda a, b: a + b)((2, 5)) == 7
- class streamable.Observation( )¶
Representation of the progress of iteration over a stream.
- Parameters:
subject (
str) – Human-readable description of stream elements (e.g., “cats”, “dogs”, “requests”).elapsed (
timedelta) – Time elapsed since iteration started.errors (
int) – Number of errors encountered during iteration so far.elements (
int) – Number of elements emitted so far.
- count(
- value,
- /,
Return number of occurrences of value.
- index(
- value,
- start=0,
- stop=9223372036854775807,
- /,
Return first index of value.
Raises ValueError if the value is not present.