API Reference

streamable

class streamable.stream(
source: Iterable[T] | AsyncIterable[T] | Callable[[], Coroutine[Any, Any, T]] | Callable[[], T],
)

stream[T] wraps any Iterable[T] or AsyncIterable[T] with a lazy fluent interface covering concurrency, batching, buffering, rate limiting, progress logging, and error handling.

Chain lazy operations, source elements are processed on-the-fly during iteration.

Operations accept both sync and async functions, they can be mixed within the same stream, that can then be consumed as an Iterable or AsyncIterable. Async functions run in the current loop, one is created if needed.

Operations are implemented so that the iteration can resume after an exception.

Parameters:

source (Iterable[T] | AsyncIterable[T] | Callable[[], T] | Callable[[], Coroutine[Any, Any, T]]) –

  • Iterable[T] | AsyncIterable[T]: A new iterator is created from this iterable for each stream iteration.

  • Callable[[], T] | Callable[[], Coroutine[Any, Any, T]]: Function called sequentially to get the next element.

Returns:

Stream wrapping the source.

Return type:

stream[T]

Example:

import logging
from datetime import timedelta
import httpx
from httpx import Response, HTTPStatusError

pokemons: stream[str] = (
    stream(range(10))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .throttle(5, per=timedelta(seconds=1))
    .map(httpx.get, concurrency=2)
    .do(Response.raise_for_status)
    .catch(HTTPStatusError, do=logging.warning)
    .map(lambda poke: poke.json()["name"])
)

>>> list(pokemons)
['bulbasaur', 'ivysaur', 'venusaur', ...]

>>> [poke async for poke in pokemons]
['bulbasaur', 'ivysaur', 'venusaur', ...]
accept(
visitor: Visitor[V],
)[source]

Accept a visitor to traverse the stream’s operation chain.

Parameters:

visitor (Visitor[V]) – Visitor instance that will traverse the stream chain.

Returns:

Result of the visitor’s traversal.

Return type:

V

buffer(
up_to: int,
)[source]

Buffer upstream elements into a bounded queue (max size up_to), via a background task.

Allow to decouple the upstream production rate from the downstream consumption rate.

The background task is a thread during a sync iteration, and an async task during an async iteration.

Parameters:

up_to (int) – The buffer size. Must be >= 1. When reached, upstream pulling pauses until an element is yielded out of the buffer.

Returns:

Upstream with buffering.

Return type:

stream[T]

Example:

pulled: list[int] = []
buffered_ints = iter(
    stream(range(10))
    .do(pulled.append)
    .buffer(5)
)
assert next(buffered_ints) == 0
time.sleep(1e-3)
assert pulled == [0, 1, 2, 3, 4, 5]
cast(
into: Type[U],
)[source]

Cast the elements into a different type.

This is for type checkers only and has no impact on the iteration.

Parameters:

into (type[U]) – Target type for stream elements.

Returns:

Upstream casted.

Return type:

stream[U]

Example:

docs: stream[Any] = stream(['{"foo": "bar"}', '{"foo": "baz"}']).map(json.loads)
dicts: stream[dict[str, str]] = docs.cast(dict[str, str])
# the stream remains the same, it's for type checkers only
assert dicts is docs
catch(
errors: Type[Exc] | Tuple[Type[Exc], ...],
*,
where: None | Callable[[Exc], Any] | Callable[[Exc], Coroutine[Any, Any, Any]] = None,
do: None | Callable[[Exc], Any] | Callable[[Exc], Coroutine[Any, Any, Any]] = None,
replace: None | Callable[[Exc], U] | Callable[[Exc], Coroutine[Any, Any, U]] = None,
stop: bool = False,
)[source]

Catch and handle exceptions raised upstream.

An exception is caught if it is an instance of given errors and it satisfies the where predicate.

When an exception is caught: the do callback is called, followed by replace, if provided.

Parameters:
  • errors (type[Exception] | tuple[type[Exception], ...]) – Exception type(s) to catch.

  • where (Callable[[Exception], Any] | AsyncFunction[Exception, Any] | None, optional) – Only exceptions for which where(exc) is truthy are caught.

  • do (Callable[[Exception], Any] | AsyncFunction[Exception, Any] | None, optional) – do(exception) is called when an exception is caught.

  • replace (Callable[[Exception], U] | AsyncFunction[Exception, U] | None, optional) – replace(exception) is yielded when an exception is caught.

  • stop (bool, optional) – If True, iteration stops when an exception is caught.

Returns:

Upstream with exception handling.

Return type:

stream[T | U]

Example:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError)
)

assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

# `where` a predicate is satisfied
from http import HTTPStatus

urls = [
    "https://github.com/ebonnal",
    "https://github.com/ebonnal/streamable",
    "https://github.com/ebonnal/foo",
]
responses: stream[httpx.Response] = (
    stream(urls)
    .map(httpx.get)
    .do(httpx.Response.raise_for_status)
    .catch(
        httpx.HTTPStatusError,
        where=lambda e: e.response.status_code == HTTPStatus.NOT_FOUND,
    )
)
assert len(list(responses)) == 2

# `do` a side effect on catch
errors: list[Exception] = []
inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, do=errors.append)
)
assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
assert len(errors) == 1

# `replace` with a value
inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, replace=lambda e: float("inf"))
)

assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

# `stop=True` to stop the iteration if an exception is caught
inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, stop=True)
)

assert list(inverses) == []
do(
effect: Callable[[T], Any] | Callable[[T], Coroutine[Any, Any, Any]],
*,
concurrency: int | Executor = 1,
as_completed: bool = False,
)[source]

Perform a side effect on each upstream element, yielding them unchanged.

Concurrency:

  • Set the concurrency param to apply the effect concurrently.

  • Only concurrency upstream elements are in-flight for processing.

  • Preserve upstream order unless you set as_completed=True.

Parameters:
  • effect (Callable[[T], Any] | AsyncFunction[T, Any]) – The side effect to perform.

  • concurrency (int | Executor, optional) –

    Concurrency control:

    • 1 (default): Sequential processing.

    • int > 1: Concurrent via concurrency threads or async tasks if effect is async.

    • Executor: Uses the provided executor (e.g., a ProcessPoolExecutor), the concurrency is the number of workers.

  • as_completed (bool, optional) –

    Order of elements:

    • False (default): Preserve upstream order.

    • True: Yield elements in the order their side effects complete.

Returns:

Upstream with side effects.

Return type:

stream[T]

Example:

state: list[int] = []
storing_ints: stream[int] = stream(range(10)).do(state.append)
assert list(storing_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
filter(where: ~typing.Callable[[~streamable._stream.T], ~typing.Any] | ~typing.Callable[[~streamable._stream.T], ~typing.Coroutine[~typing.Any, ~typing.Any, ~typing.Any]] = <class 'bool'>)[source]

Filter elements where a predicate is truthy.

Parameters:

where (Callable[[T], Any] | AsyncFunction[T, Any], optional) – An element is kept if where(elem) is truthy.

Returns:

Filtered upstream.

Return type:

stream[T]

Example:

even_ints: stream[int] = stream(range(10)).filter(lambda n: n % 2 == 0)
assert list(even_ints) == [0, 2, 4, 6, 8]
flatten(
*,
concurrency: int = 1,
)[source]

Explode upstream elements, assumed to be Iterable or AsyncIterable.

Parameters:

concurrency (int, optional) –

Concurrency control:

  • 1 (default): Flatten each upstream iterable before moving on to the next one.

  • int > 1: Flatten concurrency upstream iterables at a time in a round-robin fashion (via threads, or via async tasks for AsyncIterable elements).

Returns:

Stream of all elements from upstream iterables.

Return type:

stream[U]

Example:

chars: stream[str] = stream(["hel", "lo!"]).flatten()
assert list(chars) == ["h", "e", "l", "l", "o", "!"]

chars: stream[str] = stream(["hel", "lo", "!"]).flatten(concurrency=2)
assert list(chars) == ["h", "l", "e", "o", "l", "!"]
group(
up_to: int | None = None,
*,
within: timedelta | None = None,
by: None | Callable[[T], U] | Callable[[T], Coroutine[Any, Any, U]] = None,
)[source]

Group elements into batches:

  • up_to a given batch size

  • within a given time interval

  • by a given key, yielding (key, elements) pairs

You can combine these parameters.

If an exception is encountered during grouping, the pending batch is yielded (all the pending batches if by is set), and then the exception is raised.

Parameters:
  • up_to (int | None, optional) – If a batch reaches that size, it is yielded.

  • within (timedelta | None, optional) – A batch pending for more than within is yielded, even if under up_to elements.

  • by (Callable[[T], U] | AsyncFunction[T, U] | None, optional) – Co-group elements into (key, elements) tuples.

Return type:

Union[stream[List[TypeVar(T)]], stream[Tuple[TypeVar(U), List[TypeVar(T)]]]]

Returns:

stream[list[T]], or stream[tuple[U, list[T]]] if by is set.

Example:

int_batches: stream[list[int]] = stream(range(10)).group(5)

assert list(int_batches) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

# `within` a given time interval
from datetime import timedelta
int_1s_batches: stream[list[int]] = (
    stream(range(10))
    .throttle(2, per=timedelta(seconds=1))
    .group(within=timedelta(seconds=0.99))
)

assert list(int_1s_batches) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]

# `by` a given key, yielding `(key, elements)` pairs
ints_by_parity: stream[tuple[str, list[int]]] = (
    stream(range(10))
    .group(by=lambda n: "odd" if n % 2 else "even")
)

assert list(ints_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]
map(
into: Callable[[T], U] | Callable[[T], Coroutine[Any, Any, U]],
*,
concurrency: int | Executor = 1,
as_completed: bool = False,
)[source]

Transform upstream elements.

Concurrency:

  • Set the concurrency param to apply the transformation concurrently.

  • Only concurrency upstream elements are in-flight for processing.

  • Preserve upstream order unless you set as_completed=True.

Parameters:
  • into (Callable[[T], U] | AsyncFunction[T, U]) – The transformation.

  • concurrency (int | Executor, optional) –

    Concurrency control:

    • 1 (default): Sequential processing.

    • int > 1: Concurrent via concurrency threads or async tasks if into is async.

    • Executor: Uses the provided executor (e.g., a ProcessPoolExecutor), the concurrency is the number of workers.

  • as_completed (bool, optional) –

    Order of results:

    • False (default): Preserve upstream order.

    • True: Yield results as they become available.

Returns:

Mapped upstream.

Return type:

stream[U]

Example:

int_chars: stream[str] = stream(range(10)).map(str)
assert list(int_chars) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

# concurrency via threads
pokemons: stream[str] = (
    stream(range(1, 4))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .map(httpx.get, concurrency=2)
    .map(lambda poke: poke.json()["name"])
)
assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']

# concurrency via `async` coroutines within async context
async with httpx.AsyncClient() as http_client:
    pokemons: stream[str] = (
        stream(range(1, 4))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .map(http_client.get, concurrency=2)
        .map(lambda poke: poke.json()["name"])
    )
    assert [name async for name in pokemons] == ['bulbasaur', 'ivysaur', 'venusaur']

# concurrency via `async` coroutines within sync context
with asyncio.Runner() as runner:
    http_client = httpx.AsyncClient()
    pokemons: stream[str] = (
        stream(range(1, 4))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .map(http_client.get, concurrency=2)
        .map(lambda poke: poke.json()["name"])
    )
    # uses runner's loop
    assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']
    runner.run(http_client.aclose())

# concurrency via processes
with ProcessPoolExecutor(max_workers=10) as processes:
    state: list[int] = []
    # ints are mapped
    assert list(
        stream(range(10))
        .map(state.append, concurrency=processes)
    ) == [None] * 10
    # the `state` of the main process is not mutated
    assert state == []

# starmap
from streamable import star
enumerated_pokes: stream[str] = (
    stream(enumerate(pokemons))
    .map(star(lambda index, poke: f"#{index + 1} {poke}"))
)
assert list(enumerated_pokes) == ['#1 bulbasaur', '#2 ivysaur', '#3 venusaur', '#4 charmander', '#5 charmeleon', '#6 charizard', '#7 squirtle', '#8 wartortle', '#9 blastoise']
observe(subject: str = 'elements', *, every: None | int | ~datetime.timedelta = None, do: ~typing.Callable[[~streamable.Observation], ~typing.Any] | ~typing.Callable[[~streamable.Observation], ~typing.Coroutine[~typing.Any, ~typing.Any, ~typing.Any]] = <bound method Logger.info of <Logger streamable (INFO)>>)[source]

Observe the iteration progress:

  • elapsed time since the iteration started

  • number of elements yielded upstream

  • number of errors raised upstream

A streamable.Observation is passed to the do callback (the default emits a log), at a frequency defined by every.

If an error is raised by do, it is silently ignored and is not included in the errors count.

Parameters:
  • subject (str, optional) – Description of elements being observed.

  • every (int | timedelta | None, optional) –

    When to emit observations:

    • None (default): When the elements/errors counts reach powers of 2.

    • int: Periodically when every elements or errors have been emitted.

    • timedelta: Periodically every time interval.

  • do (Callable[[streamable.Observation], Any] | AsyncFunction[streamable.Observation, Any], optional) – Callback receiving a streamable.Observation (subject, elapsed, errors, elements).

Returns:

Stream with progress observed.

Return type:

stream[T]

Example:

observed_ints: stream[int] = stream(range(10)).observe("ints")
assert list(observed_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

# observe every 1k elements (or errors)
observed_ints = stream(range(10)).observe("ints", every=1000)
# observe every 5 seconds
observed_ints = stream(range(10)).observe("ints", every=timedelta(seconds=5))

observed_ints = stream(range(10)).observe("ints", do=other_logger.info)
observed_ints = stream(range(10)).observe("ints", do=logs.append)
observed_ints = stream(range(10)).observe("ints", do=print)
pipe(into: ~typing.Callable[[~typing.Concatenate[~streamable.stream[~streamable._stream.T], ~P]], ~streamable._stream.U], *args: ~typing.~P, **kwargs: ~typing.~P)[source]

Apply a callable on the stream.

Parameters:
  • into (Callable[Concatenate[stream[T], P], U]) – Function to call with the stream as first argument, followed by *args and **kwargs.

  • *args (ParamSpecArgs) – Positional arguments passed to into.

  • **kwargs (ParamSpecKwargs) – Keyword arguments passed to into.

Returns:

Result of into(self, *args, **kwargs).

Return type:

U

Example:

import polars as pl
pokemons: stream[str] = ...
pokemons.pipe(pl.DataFrame, schema=["name"]).write_csv("pokemons.csv")
skip(
until: int | Callable[[T], Any] | Callable[[T], Coroutine[Any, Any, Any]],
)[source]

Skip a given number of elements, or skip until a predicate is satisfied.

Parameters:

until (int | Callable[[T], Any] | AsyncFunction[T, Any]) –

Skip control:

  • int: Skip first until elements. Must be >= 0.

  • Callable[[T], Any] | AsyncFunction[T, Any]: Skip until until(elem) is truthy, then yield that element and all subsequent.

Returns:

Stream of remaining upstream elements after skipping.

Return type:

stream[T]

Example:

ints_after_5: stream[int] = stream(range(10)).skip(5)
assert list(ints_after_5) == [5, 6, 7, 8, 9]

ints_after_5: stream[int] = stream(range(10)).skip(until=lambda n: n >= 5)
assert list(ints_after_5) == [5, 6, 7, 8, 9]
take(
until: int | Callable[[T], Any] | Callable[[T], Coroutine[Any, Any, Any]],
)[source]

Take a given number of elements, or take until a predicate is satisfied, remaining upstream elements are not consumed.

Parameters:

until (int | Callable[[T], Any] | AsyncFunction[T, Any]) –

Stop control:

  • int: Take first until elements. Must be >= 0.

  • Callable[[T], Any] | AsyncFunction[T, Any]: Take until until(elem) is truthy, then stop. Matching element is not yielded.

Returns:

Stream of taken upstream elements.

Return type:

stream[T]

Example:

first_5_ints: stream[int] = stream(range(10)).take(5)
assert list(first_5_ints) == [0, 1, 2, 3, 4]

first_5_ints: stream[int] = stream(range(10)).take(until=lambda n: n == 5)
assert list(first_5_ints) == [0, 1, 2, 3, 4]
throttle(
up_to: int,
*,
per: timedelta,
)[source]

Limit to up_to emissions (elements or exceptions) per time window.

For each new upstream element or exception, if fewer than up_to were emitted in the last per interval, emits it immediately; otherwise sleeps until oldest emission leaves the sliding window.

Parameters:
  • up_to (int) – Maximum emissions allowed in the sliding window (per). Must be >= 1.

  • per (timedelta) – Duration of the sliding time window. Must be positive.

Returns:

Stream with rate limiting.

Return type:

stream[T]

Example:

from datetime import timedelta
throttled_ints: stream[int] = stream(range(10)).throttle(3, per=timedelta(seconds=1))
# takes 3 seconds
assert list(throttled_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
property source: Iterable | AsyncIterable | Callable

The source of elements wrapped by this stream.

Returns:

Iterable | AsyncIterable | Callable

property upstream: stream | None

The parent stream in the operation chain, if any.

Returns:

stream | None

streamable.star(
func: Callable[[...], R],
)

Transform a function (sync or async) that takes several positional arguments into a function that takes a tuple.

@star
def add(a: int, b: int) -> int:
    return a + b

assert add((2, 5)) == 7

assert star(lambda a, b: a + b)((2, 5)) == 7
Return type:

Callable[[Tuple[Any, ...]], TypeVar(R)]

class streamable.Observation(
subject: str,
elapsed: timedelta,
errors: int,
elements: int,
)

Representation of the progress of iteration over a stream.

Parameters:
  • subject (str) – Human-readable description of stream elements (e.g., “cats”, “dogs”, “requests”).

  • elapsed (timedelta) – Time elapsed since iteration started.

  • errors (int) – Number of errors encountered during iteration so far.

  • elements (int) – Number of elements emitted so far.

count(
value,
/,
)

Return number of occurrences of value.

index(
value,
start=0,
stop=9223372036854775807,
/,
)

Return first index of value.

Raises ValueError if the value is not present.

elapsed: timedelta

Alias for field number 1

elements: int

Alias for field number 3

errors: int

Alias for field number 2

subject: str

Alias for field number 0