StreamSet
pingthings.timeseries.client.StreamSet
¶
A collection of Stream objects.
Create a collection of stream objects.
| METHOD | DESCRIPTION |
|---|---|
changes |
Return a mapping of intervals of time that have changed between two versions for each stream. |
count |
Get the total count of raw measurements that are present in each stream. |
earliest |
Find the earliest point (in time) that is present in the streams. |
filter |
Create a new |
flush |
Force a flush of the buffered data to persistent storage for all streams. |
get_latest_version |
Get the latest version of each stream. |
get_version_at_time |
Get the version of each stream at time |
insert |
Insert new timeseries data into the streams. |
latest |
Find the latest point (in time) that is present in the streams. |
pin_versions |
Pin the versions of all streams to specific values. |
raw_values |
Return the raw time,value pairs of data from the streams. |
refresh_metadata |
Update all metadata for each stream in the |
windowed_values |
Return statistical summaries of the data. |
Functions¶
changes
¶
changes(
from_version_map: dict[UUID | str, int],
to_version_map: dict[UUID | str, int],
resolution: int = 0,
) -> dict[UUID, list[dict[str, int]]]
Return a mapping of intervals of time that have changed between two versions for each stream.
Advanced user feature
This method can have unexpected return values if you are not familiar with the underlying timeseries database that is part of the PredictiveGrid platform.
Time ranges can overlap with previously inserted data
The returned timeranges will not be exact start and end times for the data inserted. Instead, they will be aligned as closely as they can be, but also in aligment with the underlying database's tree structure.
| PARAMETER | DESCRIPTION |
|---|---|
|
The stream versions to compare from (exclusive). |
|
The stream versions to compare to (inclusive). |
|
The "coarseness" of change detection in tree width. A resolution of
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
chaanges
|
A dictionary keyed on stream UUID's where the value is a list of dictionaries representing |
dict[UUID, list[dict[str, int]]]
|
changed intervals. Typically the return |
dict[UUID, list[dict[str, int]]]
|
looks like: |
Examples:
>>> # Suppose we want to iterate over all changes between version 9 and 42
>>> # and we have a two stream streamset
>>> from_version_map = {s.uuid: 9 for s in streamset}
>>> to_version_map = {s.uuid: 42 for s in streamset}
>>> changed_range = streamset.changes(from_version_map=from_version_map,
... to_version_map=to_version_map, resolution=0):
>>> for stream_uuid, changed_range in changed_range.items():
... print(f"Stream UUID: {stream_uuid} Changed intervals: {changed_range}")
>>> # Suppose we want to iterate over all changes between version 42 and the latest data that has not been inserted yet
>>> from_version_map = {s.uuid: 42 for s in streamset}
>>> to_version_map = {s.uuid: 0 for s in streamset}
>>> changed_range = streamset.changes(from_version_map=from_version_map,
... to_version_map=to_version_map, resolution=0):
>>> for stream_uuid, changed_range in changed_range.items():
... print(f"Stream UUID: {stream_uuid} Changed intervals: {changed_range}")
count
¶
count(
start: Optional[
int | datetime | Timestamp
] = MINIMUM_TIME,
end: Optional[
int | datetime | Timestamp
] = MAXIMUM_TIME,
versions: Optional[int | dict[UUID, int]] = None,
precise: bool = False,
) -> dict[UUID, Optional[int]]
Get the total count of raw measurements that are present in each stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Bound the lower end of this query by a start time, by default MINIMUM_TIME.
TYPE:
|
|
Bound the upper end of this query by an end time, by default MAXIMUM_TIME
TYPE:
|
|
Versions of the streams to query against, by default None, which means a version of 0 is used. |
|
Do we need an exact count or is an estimate reasonable, by default False
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[UUID, Optional[int]]
|
Mapping of individual |
earliest
¶
filter
¶
filter(
collection: Optional[str] = None,
tags: Optional[dict[str, str]] = None,
annotations: Optional[dict[str, str]] = None,
refresh_metadata: bool = True,
) -> Optional[StreamSet]
Create a new StreamSet that is a subset of the streams based on metadata filtering.
| PARAMETER | DESCRIPTION |
|---|---|
|
The collection string to filter by, by default None |
|
Tag metadata to filter on, by default None |
|
Annotation metadata to filter on, by default None |
|
Should we use the latest metadata of the streams to filter on, by default True
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[StreamSet]
|
A subset of the streams that match the provided filters, if any. |
flush
¶
get_latest_version
¶
get_version_at_time
¶
insert
¶
insert(
data_map: dict[UUID, Table],
merge_policy: MergePolicy = "replace",
) -> dict[UUID, int]
Insert new timeseries data into the streams.
Default merge policy has changed!
Starting with the new pingthings api, the default merge policy is now replace.
Please refer to the merge policy docs
Data must follow a specific schema for insertion
Your data pyarrow table or record batch must have a schema
that matches the TIME_VALUE_F64_SCHEMA, defined. Order of the columns matter.
| PARAMETER | DESCRIPTION |
|---|---|
|
A mapping of |
|
How to handle when duplicate timestamp points are inserted into the stream
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[UUID, int]
|
A mapping of |
latest
¶
pin_versions
¶
Pin the versions of all streams to specific values.
raw_values
¶
raw_values(
start: int | datetime | Timestamp,
end: int | datetime | Timestamp,
snap_period: Optional[int | timedelta | Timedelta] = 0,
versions: Optional[list[int]] = None,
schema: Optional[Schema] = None,
) -> Table
Return the raw time,value pairs of data from the streams.
StreamSet raw value queries de-duplicate timestamps
Current behavior for the accelerated "multistream" streamset queries de-duplicates timestamps internally.
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
What period of time (if any) should the data be aligned to? |
|
What versions of the streams to query against, using the default of |
|
What
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Table
|
A table of timeseries data in the interval of [start, end) |
windowed_values
¶
windowed_values(
start: int | datetime | Timestamp,
end: int | datetime | Timestamp,
width: int | timedelta | Timedelta | _PW,
precise: Optional[bool] = False,
versions: Optional[list[int]] = None,
schema: Optional[Schema] = STAT_F32_SCHEMA,
) -> Table
Return statistical summaries of the data.
| PARAMETER | DESCRIPTION |
|---|---|
|
The approximate start time to get data (inclusive). See notes. |
|
The approximate end time for the data query (exclusive). See notes. |
|
The approximate size of statistical windows to return, will contain summaries of the data between |
|
Pass in |
|
What versions of the stream to query against, using the default of |
|
What
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Table
|
The statistical summary information of the stream as a timeseries. |
Notes
By default (precise=False), the values provided to start, end and width may not fully align with
the values actually used by the query. Instead, the window width used will be the largest power of 2 ns
that is smaller than the provided window. Doing so aligns the query with BTrDB's internal tree structure
and thus increases query performance by several orders of magnitude.
Consequently, the actual time-range of the data pulled will be the aligned values that fall within the range \([start, end)\).