Skip to content

StreamSet

pingthings.timeseries.client.StreamSet

StreamSet(streams: list[Stream] = [])

A collection of Stream objects.

Create a collection of stream objects.

METHOD DESCRIPTION
changes

Return a mapping of intervals of time that have changed between two versions for each stream.

count

Get the total count of raw measurements that are present in each stream.

earliest

Find the earliest point (in time) that is present in the streams.

filter

Create a new StreamSet that is a subset of the streams based on metadata filtering.

flush

Force a flush of the buffered data to persistent storage for all streams.

get_latest_version

Get the latest version of each stream.

get_version_at_time

Get the version of each stream at time realtime.

insert

Insert new timeseries data into the streams.

latest

Find the latest point (in time) that is present in the streams.

pin_versions

Pin the versions of all streams to specific values.

raw_values

Return the raw time,value pairs of data from the streams.

refresh_metadata

Update all metadata for each stream in the StreamSet

windowed_values

Return statistical summaries of the data.

Functions

changes

Return a mapping of intervals of time that have changed between two versions for each stream.

Advanced user feature

This method can have unexpected return values if you are not familiar with the underlying timeseries database that is part of the PredictiveGrid platform.

Time ranges can overlap with previously inserted data

The returned timeranges will not be exact start and end times for the data inserted. Instead, they will be aligned as closely as they can be, but also in aligment with the underlying database's tree structure.

PARAMETER DESCRIPTION
from_version_map

The stream versions to compare from (exclusive).

TYPE: dict[UUID | str, int]

to_version_map

The stream versions to compare to (inclusive).

TYPE: dict[UUID | str, int]

resolution

The "coarseness" of change detection in tree width. A resolution of 0 typically returns the finest granularity of changed intervals (which is still aligned with our internal tree structure), a resolution of 30, would be aligned exactly as a pointwidth of 2^30 nanoseconds. This resolution is the same as a pointwidth, please refer to the explanation page here on stat point widths of our tree.

DEFAULT: 0

RETURNS DESCRIPTION
chaanges

A dictionary keyed on stream UUID's where the value is a list of dictionaries representing

TYPE: dict[UUID, list[dict[str, int]]]

dict[UUID, list[dict[str, int]]]

changed intervals. Typically the return

dict[UUID, list[dict[str, int]]]

looks like: {UUID: [{"start": <nanosecond_timestamp>, "end": <nanosecond_timestamp>}, ...].

Examples:

>>> # Suppose we want to iterate over all changes between version 9 and 42
>>> # and we have a two stream streamset
>>> from_version_map = {s.uuid: 9 for s in streamset}
>>> to_version_map = {s.uuid: 42 for s in streamset}
>>> changed_range = streamset.changes(from_version_map=from_version_map,
...                                        to_version_map=to_version_map, resolution=0):
>>> for stream_uuid, changed_range in changed_range.items():
...     print(f"Stream UUID: {stream_uuid} Changed intervals: {changed_range}")
>>> # Suppose we want to iterate over all changes between version 42 and the latest data that has not been inserted yet
>>> from_version_map = {s.uuid: 42 for s in streamset}
>>> to_version_map = {s.uuid: 0 for s in streamset}
>>> changed_range = streamset.changes(from_version_map=from_version_map,
...                                        to_version_map=to_version_map, resolution=0):
>>> for stream_uuid, changed_range in changed_range.items():
...     print(f"Stream UUID: {stream_uuid} Changed intervals: {changed_range}")

count

Get the total count of raw measurements that are present in each stream.

PARAMETER DESCRIPTION
start

Bound the lower end of this query by a start time, by default MINIMUM_TIME.

TYPE: Optional[int | datetime | Timestamp] DEFAULT: MINIMUM_TIME

end

Bound the upper end of this query by an end time, by default MAXIMUM_TIME

TYPE: Optional[int | datetime | Timestamp] DEFAULT: MAXIMUM_TIME

versions

Versions of the streams to query against, by default None, which means a version of 0 is used.

TYPE: Optional[int | dict[UUID, int]] DEFAULT: None

precise

Do we need an exact count or is an estimate reasonable, by default False

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[UUID, Optional[int]]

Mapping of individual stream.uuid's to count values.

earliest

earliest() -> dict[UUID, Optional[Point]]

Find the earliest point (in time) that is present in the streams.

RETURNS DESCRIPTION
dict[UUID, Optional[Point]]

The earliest points in the streams.

filter

filter(
    collection: Optional[str] = None,
    tags: Optional[dict[str, str]] = None,
    annotations: Optional[dict[str, str]] = None,
    refresh_metadata: bool = True,
) -> Optional[StreamSet]

Create a new StreamSet that is a subset of the streams based on metadata filtering.

PARAMETER DESCRIPTION
collection

The collection string to filter by, by default None

TYPE: Optional[str] DEFAULT: None

tags

Tag metadata to filter on, by default None

TYPE: Optional[dict[str, str]] DEFAULT: None

annotations

Annotation metadata to filter on, by default None

TYPE: Optional[dict[str, str]] DEFAULT: None

refresh_metadata

Should we use the latest metadata of the streams to filter on, by default True

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Optional[StreamSet]

A subset of the streams that match the provided filters, if any.

flush

flush() -> dict[UUID, int]

Force a flush of the buffered data to persistent storage for all streams.

If data was present, the version number will be positively incremented.

RETURNS DESCRIPTION
dict[UUID, int]

The version number of each stream after the flush as a uuid:version dictionary mapping.

get_latest_version

get_latest_version() -> dict[UUID, int]

Get the latest version of each stream.

RETURNS DESCRIPTION
dict[UUID, int]

A stream.uuid, latest version mapping.

get_version_at_time

get_version_at_time(
    realtime: int | datetime,
) -> dict[UUID, int]

Get the version of each stream at time realtime.

PARAMETER DESCRIPTION
realtime

The time to check the stream version against

TYPE: int | datetime

RETURNS DESCRIPTION
dict[UUID, int]

A stream.uuid, version mapping.

insert

insert(
    data_map: dict[UUID, Table],
    merge_policy: MergePolicy = "replace",
) -> dict[UUID, int]

Insert new timeseries data into the streams.

Default merge policy has changed!

Starting with the new pingthings api, the default merge policy is now replace. Please refer to the merge policy docs

Data must follow a specific schema for insertion

Your data pyarrow table or record batch must have a schema that matches the TIME_VALUE_F64_SCHEMA, defined. Order of the columns matter.

PARAMETER DESCRIPTION
data_map

A mapping of stream.uuid to pyarrow.Table timeseries data.

TYPE: dict[UUID, Table]

merge_policy

How to handle when duplicate timestamp points are inserted into the stream

TYPE: MergePolicy DEFAULT: 'replace'

RETURNS DESCRIPTION
dict[UUID, int]

A mapping of stream.uuid, version of the stream after insertion.

latest

latest() -> dict[UUID, Optional[Point]]

Find the latest point (in time) that is present in the streams.

RETURNS DESCRIPTION
dict[UUID, Optional[Point]]

The latest point in the streams, if present.

pin_versions

pin_versions(versions: Optional[list[int]] = None) -> None

Pin the versions of all streams to specific values.

raw_values

raw_values(
    start: int | datetime | Timestamp,
    end: int | datetime | Timestamp,
    snap_period: Optional[int | timedelta | Timedelta] = 0,
    versions: Optional[list[int]] = None,
    schema: Optional[Schema] = None,
) -> Table

Return the raw time,value pairs of data from the streams.

StreamSet raw value queries de-duplicate timestamps

Current behavior for the accelerated "multistream" streamset queries de-duplicates timestamps internally.

PARAMETER DESCRIPTION
start

Start time to get data (inclusive)

TYPE: int | datetime | Timestamp

end

End time for the data query (exclusive)

TYPE: int | datetime | Timestamp

snap_period

What period of time (if any) should the data be aligned to?

TYPE: Optional[int | timedelta | Timedelta] DEFAULT: 0

versions

What versions of the streams to query against, using the default of None will use version 0.

TYPE: Optional[list[int]] DEFAULT: None

schema

What pyarrow.Schema should the time,values be returned as, default will be the server default.

TYPE: Optional[Schema] DEFAULT: None

RETURNS DESCRIPTION
Table

A table of timeseries data in the interval of [start, end)

refresh_metadata

refresh_metadata() -> None

Update all metadata for each stream in the StreamSet

windowed_values

windowed_values(
    start: int | datetime | Timestamp,
    end: int | datetime | Timestamp,
    width: int | timedelta | Timedelta | _PW,
    precise: Optional[bool] = False,
    versions: Optional[list[int]] = None,
    schema: Optional[Schema] = STAT_F32_SCHEMA,
) -> Table

Return statistical summaries of the data.

PARAMETER DESCRIPTION
start

The approximate start time to get data (inclusive). See notes.

TYPE: int | datetime | Timestamp

end

The approximate end time for the data query (exclusive). See notes.

TYPE: int | datetime | Timestamp

width

The approximate size of statistical windows to return, will contain summaries of the data between start to end of size width.

TYPE: int | timedelta | Timedelta | _PW

precise

Pass in precise=True to use the exact start, end and width values specified. See notes.

TYPE: Optional[bool] DEFAULT: False

versions

What versions of the stream to query against, using the default of None will use version 0.

TYPE: Optional[list[int]] DEFAULT: None

schema

What pyarrow.Schema should the time,statistical summary timeseries be returned as, default will be the server default.

TYPE: Optional[Schema] DEFAULT: STAT_F32_SCHEMA

RETURNS DESCRIPTION
Table

The statistical summary information of the stream as a timeseries.

Notes

By default (precise=False), the values provided to start, end and width may not fully align with the values actually used by the query. Instead, the window width used will be the largest power of 2 ns that is smaller than the provided window. Doing so aligns the query with BTrDB's internal tree structure and thus increases query performance by several orders of magnitude.

Consequently, the actual time-range of the data pulled will be the aligned values that fall within the range \([start, end)\).