AsyncStreamSet
pingthings.timeseries.async_client.AsyncStreamSet
¶
AsyncStreamSet(streams: list[AsyncStream] = [])
A collection of AsyncStream objects.
Create a collection of AsyncStream objects.
| METHOD | DESCRIPTION |
|---|---|
aligned_windows |
Return statistical summary timeseries data aligned with the internal tree structure of the database. |
changes |
Return a mapping of intervals of time that have changed between two versions for each stream. |
count |
Get the total count of raw measurements that are present in each stream. |
earliest |
Find the earliest point (in time) that is present in the streams. |
filter |
Create a new |
flush |
Force a flush of the buffered data to persistent storage for all streams. |
get_latest_version |
Get the latest version of each stream. |
get_version_at_time |
Get the version of each stream at time |
insert |
Insert new timeseries data into the streams. |
latest |
Find the latest point (in time) that is present in the streams. |
pin_versions |
Pin the versions of all streams to specific values. |
precise_windows |
Return custom-sized statistical summaries of the data. |
raw_values |
Return the raw time,value pairs of data from the streams. |
refresh_metadata |
Update all metadata for each stream in the |
Functions¶
aligned_windows
async
¶
aligned_windows(
start: int | datetime,
end: int | datetime,
point_width: int | _PW,
versions: Optional[list[int]] = None,
single_stream_schema: Optional[
Schema
] = STAT_F32_SCHEMA,
) -> list[Table | None]
Return statistical summary timeseries data aligned with the internal tree structure of the database.
Query BTrDB for aggregates (or roll ups or windows) of the time series with version between time start (inclusive) and end (exclusive) in nanoseconds [start, end).
Each point returned is a statistical aggregate of all the raw data within a window of width 2**pointwidth nanoseconds.
These statistical aggregates currently include the mean, minimum, maximum, count, and standard deviation of the data composing the window.
Understanding aligned_windows queries
startis inclusive, butendis exclusive. Results will be returned for all windows that start in the interval \([start, end)\).- If \(end < start + 2^{pointwidth}\), you will not get any results.
- If
startandendare not powers of two, the bottompointwidthbits will be cleared, aligning them to the nearest multiple of \(2^{pointwidth}\). For example, if you query data between[31, 121)withpointwidth = 4, the actual query will be performed on[16, 112]. - Each window will contain statistical summaries of the window.
- Statistical points with
count == 0will be omitted.
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
What size statistical windows to return, aligned to a size of the internal tree. Value will be interpreted as \(2^{point\_width}\).
TYPE:
|
|
What version of the stream to query against, using the default of |
|
What
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Table | None]
|
Statistical summary timeseries of the streams between \([start, end)\) |
changes
async
¶
changes(
from_version_map: dict[UUID | str, int],
to_version_map: dict[UUID | str, int],
resolution: int = 0,
) -> dict[UUID, list[dict[str, int]]]
Return a mapping of intervals of time that have changed between two versions for each stream.
Advanced user feature
This method can have unexpected return values if you are not familiar with the underlying timeseries database that is part of the PredictiveGrid platform.
Time ranges can overlap with previously inserted data
The returned timeranges will not be exact start and end times for the data inserted. Instead, they will be aligned as closely as they can be, but also in aligment with the underlying database's tree structure.
| PARAMETER | DESCRIPTION |
|---|---|
|
The stream versions to compare from (exclusive). |
|
The stream versions to compare to (inclusive). |
|
The "coarseness" of change detection in tree width. A resolution of
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
chaanges
|
A dictionary keyed on stream UUID's where the value is a list of dictionaries representing |
dict[UUID, list[dict[str, int]]]
|
changed intervals. Typically the return |
dict[UUID, list[dict[str, int]]]
|
looks like: |
Examples:
>>> # Suppose we want to iterate over all changes between version 9 and 42
>>> # and we have a two stream streamset
>>> from_version_map = {s.uuid: 9 for s in streamset}
>>> to_version_map = {s.uuid: 42 for s in streamset}
>>> changed_range = await streamset.changes(from_version_map=from_version_map,
... to_version_map=to_version_map, resolution=0):
>>> for stream_uuid, changed_range in changed_range.items():
... print(f"Stream UUID: {stream_uuid} Changed intervals: {changed_range}")
>>> # Suppose we want to iterate over all changes between version 42 and the latest data that has not been inserted yet
>>> from_version_map = {s.uuid: 42 for s in streamset}
>>> to_version_map = {s.uuid: 0 for s in streamset}
>>> changed_range = await streamset.changes(from_version_map=from_version_map,
... to_version_map=to_version_map, resolution=0):
>>> for stream_uuid, changed_range in changed_range.items():
... print(f"Stream UUID: {stream_uuid} Changed intervals: {changed_range}")
count
async
¶
count(
start: Optional[int] = None,
end: Optional[int] = None,
versions: Optional[int | dict[UUID, int]] = None,
precise: bool = False,
) -> dict[UUID, int]
Get the total count of raw measurements that are present in each stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Bound the lower end of this query by a start time, by default MINIMUM_TIME. |
|
Bound the upper end of this query by an end time, by default MAXIMUM_TIME |
|
Versions of the streams to query against, by default None, which means a version of 0 is used. |
|
Do we need an exact count or is an estimate reasonable, by default False
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[UUID, int]
|
Mapping of individual |
earliest
async
¶
filter
async
¶
filter(
collection: Optional[str] = None,
tags: Optional[dict[str, str]] = None,
annotations: Optional[dict[str, str]] = None,
refresh_metadata: bool = True,
) -> Optional[AsyncStreamSet]
Create a new AsyncStreamSet that is a subset of the streams based on metadata filtering.
| PARAMETER | DESCRIPTION |
|---|---|
|
The collection string to filter by, by default None |
|
Tag metadata to filter on, by default None |
|
Annotation metadata to filter on, by default None |
|
Should we use the latest metadata of the streams to filter on, by default True
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[AsyncStreamSet]
|
A subset of the streams that match the provided filters, if any. |
flush
async
¶
get_latest_version
async
¶
get_version_at_time
async
¶
insert
async
¶
insert(
data_map: dict[UUID, Table],
merge_policy: MergePolicy = "replace",
) -> dict[UUID, int]
Insert new timeseries data into the streams.
Default merge policy has changed!
Starting with the new pingthings api, the default merge policy is now replace.
Please refer to the merge policy docs
Data must follow a specific schema for insertion
Your data pyarrow table or record batch must have a schema
that matches the TIME_VALUE_F64_SCHEMA, defined. Order of the columns matter.
| PARAMETER | DESCRIPTION |
|---|---|
|
A mapping of |
|
How to handle duplicate timestamps in the data
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[UUID, int]
|
A mapping of |
latest
async
¶
pin_versions
async
¶
Pin the versions of all streams to specific values.
precise_windows
async
¶
precise_windows(
start: int | datetime,
end: int | datetime,
width: int,
depth: Optional[int] = 0,
versions: Optional[list[int]] = None,
single_stream_schema: Optional[
Schema
] = STAT_F32_SCHEMA,
) -> list[Table | None]
Return custom-sized statistical summaries of the data.
Understanding windows queries
windowsreturns arbitrary precision statistical summary windows from the platform. It is slower thanaligned_windows, but can be significantly faster than raw value queries (raw_values).- Each returned window will be
widthnanoseconds long. startis inclusive, butendis exclusive (e.g., ifend < start + widthyou will get no results).- Results will be returned for all windows that start at a time less than the
endtimestamp. - If (
end-start) is not a multiple ofwidth, thenendwill be decreased to the greatest value less thanendsuch that (end-start) is a multiple ofwidth(i.e., we setend = start + width * floordiv(end - start, width)). - Windows that have no data points
count==0will be omitted from the returned table
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
The size of statistical windows to return, will contain summaries of the data between
TYPE:
|
|
What is the maximum tradeoff in computation of the statistical summaries by how far we need to walk down the tree, by default 0 is the most accurate, and a range of 0->63 can be used based on the time range of data. |
|
What version of the streams to query against, using the default of |
|
What
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Table | None]
|
The statistical summary information of the streams as a timeseries. |
raw_values
async
¶
raw_values(
start: int | datetime,
end: int | datetime,
snap_period: int | timedelta = 0,
versions: Optional[list[int]] = None,
schema: Optional[Schema] = None,
) -> Table
Return the raw time,value pairs of data from the streams.
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
What period of time (if any) should the data be aligned to? |
|
What versions of the streams to query against, using the default of |
|
What
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Table
|
A table of timeseries data in the interval of \([start, end)\) |
refresh_metadata
async
¶
refresh_metadata() -> None
Update all metadata for each stream in the AsyncStreamSet