Skip to content

AsyncStreamSet

pingthings.timeseries.async_client.AsyncStreamSet

AsyncStreamSet(streams: list[AsyncStream] = [])

A collection of AsyncStream objects.

Create a collection of AsyncStream objects.

METHOD DESCRIPTION
aligned_windows

Return statistical summary timeseries data aligned with the internal tree structure of the database.

changes

Return a mapping of intervals of time that have changed between two versions for each stream.

count

Get the total count of raw measurements that are present in each stream.

earliest

Find the earliest point (in time) that is present in the streams.

filter

Create a new AsyncStreamSet that is a subset of the streams based on metadata filtering.

flush

Force a flush of the buffered data to persistent storage for all streams.

get_latest_version

Get the latest version of each stream.

get_version_at_time

Get the version of each stream at time realtime.

insert

Insert new timeseries data into the streams.

latest

Find the latest point (in time) that is present in the streams.

pin_versions

Pin the versions of all streams to specific values.

precise_windows

Return custom-sized statistical summaries of the data.

raw_values

Return the raw time,value pairs of data from the streams.

refresh_metadata

Update all metadata for each stream in the AsyncStreamSet

Functions

aligned_windows async

aligned_windows(
    start: int | datetime,
    end: int | datetime,
    point_width: int | _PW,
    versions: Optional[list[int]] = None,
    single_stream_schema: Optional[
        Schema
    ] = STAT_F32_SCHEMA,
) -> list[Table | None]

Return statistical summary timeseries data aligned with the internal tree structure of the database.

Query BTrDB for aggregates (or roll ups or windows) of the time series with version between time start (inclusive) and end (exclusive) in nanoseconds [start, end). Each point returned is a statistical aggregate of all the raw data within a window of width 2**pointwidth nanoseconds. These statistical aggregates currently include the mean, minimum, maximum, count, and standard deviation of the data composing the window.

Understanding aligned_windows queries

  • start is inclusive, but end is exclusive. Results will be returned for all windows that start in the interval \([start, end)\).
  • If \(end < start + 2^{pointwidth}\), you will not get any results.
  • If start and end are not powers of two, the bottom pointwidth bits will be cleared, aligning them to the nearest multiple of \(2^{pointwidth}\). For example, if you query data between [31, 121) with pointwidth = 4, the actual query will be performed on [16, 112].
  • Each window will contain statistical summaries of the window.
  • Statistical points with count == 0 will be omitted.
PARAMETER DESCRIPTION
start

Start time to get data (inclusive)

TYPE: int | datetime

end

End time for the data query (exclusive)

TYPE: int | datetime

point_width

What size statistical windows to return, aligned to a size of the internal tree. Value will be interpreted as \(2^{point\_width}\).

TYPE: int | _PW

versions

What version of the stream to query against, using the default of None will use version 0.

TYPE: Optional[list[int]] DEFAULT: None

single_stream_schema

What pyarrow.Schema should the time,statistical summary timeseries be returned as, default will be the server default.

TYPE: Optional[Schema] DEFAULT: STAT_F32_SCHEMA

RETURNS DESCRIPTION
list[Table | None]

Statistical summary timeseries of the streams between \([start, end)\)

changes async

Return a mapping of intervals of time that have changed between two versions for each stream.

Advanced user feature

This method can have unexpected return values if you are not familiar with the underlying timeseries database that is part of the PredictiveGrid platform.

Time ranges can overlap with previously inserted data

The returned timeranges will not be exact start and end times for the data inserted. Instead, they will be aligned as closely as they can be, but also in aligment with the underlying database's tree structure.

PARAMETER DESCRIPTION
from_version_map

The stream versions to compare from (exclusive).

TYPE: dict[UUID | str, int]

to_version_map

The stream versions to compare to (inclusive).

TYPE: dict[UUID | str, int]

resolution

The "coarseness" of change detection in tree width. A resolution of 0 typically returns the finest granularity of changed intervals (which is still aligned with our internal tree structure), a resolution of 30, would be aligned exactly as a pointwidth of 2^30 nanoseconds. This resolution is the same as a pointwidth, please refer to the explanation page here on stat point widths of our tree.

DEFAULT: 0

RETURNS DESCRIPTION
chaanges

A dictionary keyed on stream UUID's where the value is a list of dictionaries representing

TYPE: dict[UUID, list[dict[str, int]]]

dict[UUID, list[dict[str, int]]]

changed intervals. Typically the return

dict[UUID, list[dict[str, int]]]

looks like: {UUID: [{"start": <nanosecond_timestamp>, "end": <nanosecond_timestamp>}, ...].

Examples:

>>> # Suppose we want to iterate over all changes between version 9 and 42
>>> # and we have a two stream streamset
>>> from_version_map = {s.uuid: 9 for s in streamset}
>>> to_version_map = {s.uuid: 42 for s in streamset}
>>> changed_range = await streamset.changes(from_version_map=from_version_map,
...                                        to_version_map=to_version_map, resolution=0):
>>> for stream_uuid, changed_range in changed_range.items():
...     print(f"Stream UUID: {stream_uuid} Changed intervals: {changed_range}")
>>> # Suppose we want to iterate over all changes between version 42 and the latest data that has not been inserted yet
>>> from_version_map = {s.uuid: 42 for s in streamset}
>>> to_version_map = {s.uuid: 0 for s in streamset}
>>> changed_range = await streamset.changes(from_version_map=from_version_map,
...                                        to_version_map=to_version_map, resolution=0):
>>> for stream_uuid, changed_range in changed_range.items():
...     print(f"Stream UUID: {stream_uuid} Changed intervals: {changed_range}")

count async

count(
    start: Optional[int] = None,
    end: Optional[int] = None,
    versions: Optional[int | dict[UUID, int]] = None,
    precise: bool = False,
) -> dict[UUID, int]

Get the total count of raw measurements that are present in each stream.

PARAMETER DESCRIPTION
start

Bound the lower end of this query by a start time, by default MINIMUM_TIME.

TYPE: Optional[int] DEFAULT: None

end

Bound the upper end of this query by an end time, by default MAXIMUM_TIME

TYPE: Optional[int] DEFAULT: None

versions

Versions of the streams to query against, by default None, which means a version of 0 is used.

TYPE: Optional[int | dict[UUID, int]] DEFAULT: None

precise

Do we need an exact count or is an estimate reasonable, by default False

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[UUID, int]

Mapping of individual stream.uuid's to count values.

earliest async

earliest() -> dict[UUID, Optional[Point]]

Find the earliest point (in time) that is present in the streams.

RETURNS DESCRIPTION
dict[UUID, Optional[Point]]

The earliest points in the streams.

filter async

Create a new AsyncStreamSet that is a subset of the streams based on metadata filtering.

PARAMETER DESCRIPTION
collection

The collection string to filter by, by default None

TYPE: Optional[str] DEFAULT: None

tags

Tag metadata to filter on, by default None

TYPE: Optional[dict[str, str]] DEFAULT: None

annotations

Annotation metadata to filter on, by default None

TYPE: Optional[dict[str, str]] DEFAULT: None

refresh_metadata

Should we use the latest metadata of the streams to filter on, by default True

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Optional[AsyncStreamSet]

A subset of the streams that match the provided filters, if any.

flush async

flush() -> dict[UUID, int]

Force a flush of the buffered data to persistent storage for all streams.

If data was present, the version number will be positively incremented.

RETURNS DESCRIPTION
dict[UUID, int]

The version number of each stream after the flush as a uuid:version dictionary mapping.

get_latest_version async

get_latest_version() -> dict[UUID, int]

Get the latest version of each stream.

RETURNS DESCRIPTION
dict[UUID, int]

A stream.uuid, latest version mapping.

get_version_at_time async

get_version_at_time(
    realtime: int | datetime,
) -> dict[UUID, int]

Get the version of each stream at time realtime.

PARAMETER DESCRIPTION
realtime

The time to check the stream version against

TYPE: int | datetime

RETURNS DESCRIPTION
dict[UUID, int]

A stream.uuid, version mapping.

insert async

insert(
    data_map: dict[UUID, Table],
    merge_policy: MergePolicy = "replace",
) -> dict[UUID, int]

Insert new timeseries data into the streams.

Default merge policy has changed!

Starting with the new pingthings api, the default merge policy is now replace. Please refer to the merge policy docs

Data must follow a specific schema for insertion

Your data pyarrow table or record batch must have a schema that matches the TIME_VALUE_F64_SCHEMA, defined. Order of the columns matter.

PARAMETER DESCRIPTION
data_map

A mapping of stream.uuid to pyarrow.Table timeseries data.

TYPE: dict[UUID, Table]

merge_policy

How to handle duplicate timestamps in the data

TYPE: MergePolicy DEFAULT: 'replace'

RETURNS DESCRIPTION
dict[UUID, int]

A mapping of stream.uuid, version of the stream after insertion.

latest async

latest() -> dict[UUID, Optional[Point]]

Find the latest point (in time) that is present in the streams.

RETURNS DESCRIPTION
dict[UUID, Optional[Point]]

The latest point in the streams, if present.

pin_versions async

pin_versions(versions: Optional[list[int]] = None) -> None

Pin the versions of all streams to specific values.

precise_windows async

precise_windows(
    start: int | datetime,
    end: int | datetime,
    width: int,
    depth: Optional[int] = 0,
    versions: Optional[list[int]] = None,
    single_stream_schema: Optional[
        Schema
    ] = STAT_F32_SCHEMA,
) -> list[Table | None]

Return custom-sized statistical summaries of the data.

Understanding windows queries

  • windows returns arbitrary precision statistical summary windows from the platform. It is slower than aligned_windows, but can be significantly faster than raw value queries (raw_values).
  • Each returned window will be width nanoseconds long.
  • start is inclusive, but end is exclusive (e.g., if end < start + width you will get no results).
  • Results will be returned for all windows that start at a time less than the end timestamp.
  • If (end - start) is not a multiple of width, then end will be decreased to the greatest value less than end such that (end - start) is a multiple of width (i.e., we set end = start + width * floordiv(end - start, width)).
  • Windows that have no data points count==0 will be omitted from the returned table
PARAMETER DESCRIPTION
start

Start time to get data (inclusive)

TYPE: int | datetime

end

End time for the data query (exclusive)

TYPE: int | datetime

width

The size of statistical windows to return, will contain summaries of the data between start to end of size width.

TYPE: int

depth

What is the maximum tradeoff in computation of the statistical summaries by how far we need to walk down the tree, by default 0 is the most accurate, and a range of 0->63 can be used based on the time range of data.

TYPE: Optional[int] DEFAULT: 0

versions

What version of the streams to query against, using the default of None will use version 0.

TYPE: Optional[list[int]] DEFAULT: None

single_stream_schema

What pyarrow.Schema should the time,statistical summary timeseries be returned as, default will be the server default.

TYPE: Optional[Schema] DEFAULT: STAT_F32_SCHEMA

RETURNS DESCRIPTION
list[Table | None]

The statistical summary information of the streams as a timeseries.

raw_values async

raw_values(
    start: int | datetime,
    end: int | datetime,
    snap_period: int | timedelta = 0,
    versions: Optional[list[int]] = None,
    schema: Optional[Schema] = None,
) -> Table

Return the raw time,value pairs of data from the streams.

PARAMETER DESCRIPTION
start

Start time to get data (inclusive)

TYPE: int | datetime

end

End time for the data query (exclusive)

TYPE: int | datetime

snap_period

What period of time (if any) should the data be aligned to?

TYPE: int | timedelta DEFAULT: 0

versions

What versions of the streams to query against, using the default of None will use version 0.

TYPE: Optional[list[int]] DEFAULT: None

schema

What pyarrow.Schema should the time,values be returned as, default will be the server default.

TYPE: Optional[Schema] DEFAULT: None

RETURNS DESCRIPTION
Table

A table of timeseries data in the interval of \([start, end)\)

refresh_metadata async

refresh_metadata() -> None

Update all metadata for each stream in the AsyncStreamSet