AsyncStream
pingthings.timeseries.async_client.AsyncStream
¶
AsyncStream(
client: AsyncClient,
uuid: UUID,
known_to_exist: bool = False,
collection: Optional[str] = None,
tags: Optional[dict[str, str]] = None,
annotations: Optional[dict[str, Any]] = None,
property_version: int = 0,
retention: Optional[dict[str, Any]] = None,
)
The base representation of a timeseries signal in the platform.
Refresh the metadata!
If you manually create an AsyncStream, at the moment, the metadata like the collection, name, etc is not populated on creation.
Make sure to call stream.refresh_metadata() to refresh the metadata.
| PARAMETER | DESCRIPTION |
|---|---|
|
An instance of the async client connection to the platform.
TYPE:
|
|
Unique identifier of the timeseries signal, required.
TYPE:
|
|
If this stream is known to already exist, can ignore some roundtrips to the database, by default False
TYPE:
|
|
The collection string that the signal belongs to, by default None |
|
Tag-based metadata of the signal, by default None |
|
Non-tag metadata of the signal, by default None |
|
Internal version of the signal, by default 0
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
aligned_windows |
Return statistical summary timeseries data aligned with the internal tree structure of the database. |
aligned_windows_iter |
Return statistical summary timeseries data aligned with the internal tree structure of the database. |
annotations |
Return the metadata that is not present in the tag-metadata of the stream. |
changes |
Return a list of intervals of time that have changed between two versions. |
changes_iter |
Return the intervals of time that have changed between two versions as a generator. |
count |
Get the total count of raw measurements that are present in the stream. |
delete |
"Delete" all points between [ |
earliest |
Find the earliest point (in time) that is present in the stream at |
flush |
Force a flush of the buffered data to persistent storage. |
get_latest_version |
Get the current version of the stream. |
get_retention |
Get the retention policy of the stream. |
get_version_at_time |
Return the version of the stream at the time provided. |
insert |
Add new timeseries data to the stream. |
latest |
Find the latest point (in time) that is present in the stream at |
nearest |
Get the nearest datapoint at |
obliterate |
Completely remove the stream from the platform. |
pin_version |
Set the version of the stream to a specific version number, useful when wanting reproducible data queries. |
pinned_version |
Return the pinned version of the stream. |
precise_windows |
Return custom-sized statistical summaries of the data. |
precise_windows_iter |
Return custom-sized statistical summaries of the data. |
raw_values |
Return the raw time,value pairs of data from the stream between |
raw_values_iter |
Return the raw timeseries data from the stream between |
refresh_metadata |
Refreshes the locally cached metadata for a stream from the server. |
set_retention |
Set retention policy on a stream. |
tags |
Return the tag-based metadata of the stream. |
update |
Update the stream metadata. |
| ATTRIBUTE | DESCRIPTION |
|---|---|
collection |
Return the collection the stream belongs to.
TYPE:
|
name |
Return the name of the stream.
TYPE:
|
uuid |
Return the unique identifier of the stream.
TYPE:
|
Attributes¶
collection
property
¶
collection: str | None
Return the collection the stream belongs to.
| RETURNS | DESCRIPTION |
|---|---|
str | None
|
The collection of the stream. |
name
property
¶
name: str
Return the name of the stream.
| RETURNS | DESCRIPTION |
|---|---|
str
|
The name of the stream. |
uuid
property
¶
uuid: UUID
Return the unique identifier of the stream.
| RETURNS | DESCRIPTION |
|---|---|
UUID
|
The unique identifier of the stream. |
Functions¶
aligned_windows
async
¶
aligned_windows(
start: int | datetime | Timestamp,
end: int | datetime | Timestamp,
point_width: Optional[int | _PW] = None,
width: Optional[
int | timedelta | Timedelta | _PW
] = None,
version: Optional[int] = None,
schema: Optional[Schema] = STAT_F32_SCHEMA,
) -> Table
Return statistical summary timeseries data aligned with the internal tree structure of the database.
Query BTrDB for aggregates (or roll ups or windows) of the time series with version between time start (inclusive) and end (exclusive) in nanoseconds [start, end).
Each point returned is a statistical aggregate of all the raw data within a window of width 2**pointwidth nanoseconds.
These statistical aggregates currently include the mean, minimum, maximum, count, and standard deviation of the data composing the window.
Understanding aligned_windows queries
startis inclusive, butendis exclusive. Results will be returned for all windows that start in the interval \([start, end)\).- If \(end < start + 2^{pointwidth}\), you will not get any results.
- If
startandendare not powers of two, the bottompointwidthbits will be cleared, aligning them to the nearest multiple of \(2^{pointwidth}\). For example, if you query data between[31, 121)withpointwidth = 4, the actual query will be performed on[16, 112]. - Each window will contain statistical summaries of the window.
- Statistical points with
count == 0will be omitted.
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
What size statistical windows to return, aligned to a size of the internal tree. Value will be interpreted as \(2^{point\_width}\). |
|
The size of statistical windows to return, will contain summaries of the data between
TYPE:
|
|
What version of the stream to query against, using the default of |
|
What
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Table
|
Statistical summary timeseries of the stream between \([start, end)\) |
aligned_windows_iter
async
¶
aligned_windows_iter(
start: int | datetime,
end: int | datetime,
point_width: int,
version: Optional[int] = None,
schema: Optional[Schema] = STAT_F32_SCHEMA,
) -> AsyncGenerator[RecordBatch, None]
Return statistical summary timeseries data aligned with the internal tree structure of the database.
Query BTrDB for aggregates (or roll ups or windows) of the time series with version between time start (inclusive) and end (exclusive) in nanoseconds [start, end).
Each point returned is a statistical aggregate of all the raw data within a window of width 2**pointwidth nanoseconds.
These statistical aggregates currently include the mean, minimum, maximum, count, and standard deviation of the data composing the window.
Understanding aligned_windows queries
startis inclusive, butendis exclusive. Results will be returned for all windows that start in the interval \([start, end)\).- If \(end < start + 2^{pointwidth}\), you will not get any results.
- If
startandendare not powers of two, the bottompointwidthbits will be cleared, aligning them to the nearest multiple of \(2^{pointwidth}\). For example, if you query data between[31, 121)withpointwidth = 4, the actual query will be performed on[16, 112]. - Each window will contain statistical summaries of the window.
- Statistical points with
count == 0will be omitted.
Memory efficient queries
For long time ranges with small window widths (many statistical summaries), using the _iter methods is significantly more memory efficient.
These methods return data in batches, which are guaranteed to be returned in sorted order, allowing you to query much larger time ranges.
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
What size statistical windows to return, aligned to a size of the internal tree. Value will be interpreted as \(2^{point\_width}\).
TYPE:
|
|
What version of the stream to query against, using the default of |
|
What
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncGenerator[RecordBatch, None]
|
Statistical summary timeseries of the stream between \([start, end)\) |
annotations
async
¶
Return the metadata that is not present in the tag-metadata of the stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Should the returned dict be copied or passed by reference, by default True
TYPE:
|
|
Do we need to get the latest metadata from the platform first, by default True |
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A mapping of annotation metadata key,value pairs. |
changes
async
¶
changes(
from_version: int, to_version: int, resolution: int = 30
) -> list[dict[str, int] | None]
Return a list of intervals of time that have changed between two versions.
Unlike the _iter version, this method buffers and returns the changed intervals
in a single list.
Advanced user feature
This method can have unexpected return values if you are not familiar with the underlying timeseries database that is part of the PredictiveGrid platform.
Time ranges can overlap with previously inserted data
The returned timeranges will not be exact start and end times for the data inserted. Instead, they will be aligned as closely as they can be, but also in aligment with the underlying database's tree structure.
| PARAMETER | DESCRIPTION |
|---|---|
|
The stream version to compare from (exclusive).
TYPE:
|
|
The stream version to compare to (inclusive).
TYPE:
|
|
The "coarseness" of change detection in tree width. A resolution of
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
list[dict[str, int] | None]
|
list[dict[str, int] | None]:
A list of dictionaries representing changed intervals. Typically each dictionary
looks like: |
Examples:
>>> # Suppose we want to get all changes between version 9 and 42
>>> changed_ranges = await stream.changes(from_version=9, to_version=42, resolution=0)
>>> for changed_range in changed_ranges:
... print("Changed interval:", changed_range)
>>> # Get all changes between version 9 and the latest (e.g., version=0) asynchronously
>>> changed_ranges = await stream.changes(from_version=9, to_version=0, resolution=0)
>>> for changed_range in changed_ranges:
... print("Changed interval:", changed_range)
changes_iter
async
¶
changes_iter(
from_version: int, to_version: int, resolution: int = 30
) -> AsyncGenerator[dict[str, int], None]
Return the intervals of time that have changed between two versions as a generator.
Memory efficient queries
This method yields each changed interval one at a time. If you need to process
many intervals without storing them all in memory at once, this _iter method is
more memory efficient than its non-iter counterpart.
Advanced user feature
This method can have unexpected return values if you are not familiar with the underlying timeseries database that is part of the PredictiveGrid platform.
Time ranges can overlap with previously inserted data
The returned timeranges will not be exact start and end times for the data inserted. Instead, they will be aligned as closely as they can be, but also in aligment with the underlying tree structure database.
| PARAMETER | DESCRIPTION |
|---|---|
|
The stream version to compare from.
TYPE:
|
|
The stream version to compare to.
TYPE:
|
|
The "coarseness" of change detection in tree width. A resolution of
DEFAULT:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncGenerator[dict[str, int], None]
|
dict[str, int]:
A dictionary representing a changed interval, with a structure like:
|
Examples:
>>> # Suppose we want to iterate over all changes between version 9 and 42 asynchronously
>>> async for changed_range in stream.changes_iter(from_version=9, to_version=42, resolution=0):
... print("Changed interval:", changed_range)
>>> # Suppose we want to iterate over changes between version 9 and the latest version (often 0)
>>> async for changed_range in stream.changes_iter(from_version=9, to_version=0, resolution=0):
... print("Changed interval:", changed_range)
count
async
¶
count(
start: Optional[int] = None,
end: Optional[int] = None,
version: Optional[int] = None,
precise: bool = False,
) -> int
Get the total count of raw measurements that are present in the stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Bound the lower end of this query by a start time, by default MINIMUM_TIME. |
|
Bound the upper end of this query by an end time, by default MAXIMUM_TIME |
|
Version of the stream to query against, by default None, which means a version of 0 is used. |
|
Do we need an exact count or is an estimate reasonable, by default False
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
Count of points in the stream. |
delete
async
¶
"Delete" all points between [start, end)
"Delete" all points between start (inclusive) and end (exclusive),
both in nanoseconds.
This is a soft delete
The PingThings timeseries data structure has persistent multiversioning. This means that the deleted points will still exist as part of an older version of the stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Time to begin deleting points in the stream (inclusive).
TYPE:
|
|
Time to stop deleting points in the stream (exclusive).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
The updated version number of the stream. |
earliest
async
¶
flush
async
¶
flush() -> int
Force a flush of the buffered data to persistent storage.
If data was present, the version number will be positively incremented.
| RETURNS | DESCRIPTION |
|---|---|
int
|
The major version of the stream after the flush. |
get_latest_version
async
¶
get_latest_version() -> int
Get the current version of the stream.
| RETURNS | DESCRIPTION |
|---|---|
int
|
The version number of the stream. |
get_retention
async
¶
Get the retention policy of the stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Should the returned dict be copied or passed by reference, by default True
TYPE:
|
|
Do we need to get the latest metadata from the platform first, by default True |
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Retention policy. |
Examples:
>>> stream_a.get_retention()
{}
>>> stream_b.get_retention()
{'remove_older_than': datetime.timedelta(hours=2)}
get_version_at_time
async
¶
insert
async
¶
insert(
data: Table | RecordBatch,
merge_policy: MergePolicy = "replace",
) -> int
Add new timeseries data to the stream.
Data must follow a specific schema for insertion
Your data pyarrow table or record batch must have a schema
that matches the TIME_VALUE_F64_SCHEMA, defined. Order of the columns matter.
Default merge policy has changed!
Starting with the new pingthings api, the default merge policy is now replace.
Please refer to the merge policy docs
| PARAMETER | DESCRIPTION |
|---|---|
|
A pyarrow table or record batch of time series data.
TYPE:
|
|
How should the database handle data with the same timestamp, by default "never"
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
The version of the stream after data insertion, this number can increment multiple times depending on how many points are inserted. |
latest
async
¶
nearest
async
¶
nearest(
time: int | datetime | Timestamp,
backward: bool = False,
version: Optional[int] = None,
) -> Optional[Point]
Get the nearest datapoint at time with version.
| PARAMETER | DESCRIPTION |
|---|---|
|
The time to query for the nearest point. |
|
Can the query find the closest time that is before
TYPE:
|
|
Version of the stream to query against, by default None, which means version 0 will be used. |
| RETURNS | DESCRIPTION |
|---|---|
Optional[Point]
|
The time,value Point that is closest to the time in question, if available. |
obliterate
async
¶
obliterate() -> None
Completely remove the stream from the platform.
Will delete data!
Obliterating the stream will remove the stream, as well as all of its data!
This stream will no longer be accessible, make sure you are completely sure you want to do this.
If you have Admin privileges, you can obliterate streams!
pin_version
async
¶
Set the version of the stream to a specific version number, useful when wanting reproducible data queries.
Default behavior is to pin the stream to the latest version number when this method is executed.
Useful version number
If you do not want to pin the stream to a version and want to instead always use the latest version of the stream, which
includes data that is being streamed into the platform, use a version number of 0. This is a "magic" value which
tells the platform to always use the latest data it can find.
| PARAMETER | DESCRIPTION |
|---|---|
|
Version number to pin the stream to, by default None, which will pin the stream to its latest version, as returned by |
| RETURNS | DESCRIPTION |
|---|---|
int
|
The version pinned. |
pinned_version
¶
pinned_version() -> int
Return the pinned version of the stream.
| RETURNS | DESCRIPTION |
|---|---|
int
|
The version of the stream it is pinned to. |
precise_windows
async
¶
precise_windows(
start: int | datetime,
end: int | datetime,
width: int,
depth: Optional[int] = 0,
version: Optional[int] = None,
schema: Optional[Schema] = STAT_F32_SCHEMA,
) -> Table
Return custom-sized statistical summaries of the data.
Understanding windows queries
windowsreturns arbitrary precision statistical summary windows from the platform. It is slower thanaligned_windows, but can be significantly faster than raw value queries (raw_values).- Each returned window will be
widthnanoseconds long. startis inclusive, butendis exclusive (e.g., ifend < start + widthyou will get no results).- Results will be returned for all windows that start at a time less than the
endtimestamp. - If (
end-start) is not a multiple ofwidth, thenendwill be decreased to the greatest value less thanendsuch that (end-start) is a multiple ofwidth(i.e., we setend = start + width * floordiv(end - start, width)). - Windows that have no data points
count==0will be omitted from the returned table
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
The size of statistical windows to return, will contain summaries of the data between
TYPE:
|
|
What is the maximum tradeoff in computation of the statistical summaries by how far we need to walk down the tree, by default 0 is the most accurate, and a range of 0->63 can be used based on the time range of data. |
|
What version of the stream to query against, using the default of |
|
What
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Table
|
The statistical summary information of the stream as a timeseries. |
precise_windows_iter
async
¶
precise_windows_iter(
start: int | datetime,
end: int | datetime,
width: int,
depth: int = 0,
version: Optional[int] = None,
schema: Optional[Schema] = STAT_F32_SCHEMA,
) -> AsyncGenerator[RecordBatch, None]
Return custom-sized statistical summaries of the data.
Understanding windows queries
windowsreturns arbitrary precision statistical summary windows from the platform. It is slower thanaligned_windows, but can be significantly faster than raw value queries (raw_values).- Each returned window will be
widthnanoseconds long. startis inclusive, butendis exclusive (e.g., ifend < start + widthyou will get no results).- Results will be returned for all windows that start at a time less than the
endtimestamp. - If (
end-start) is not a multiple ofwidth, thenendwill be decreased to the greatest value less thanendsuch that (end-start) is a multiple ofwidth(i.e., we setend = start + width * floordiv(end - start, width)). - Windows that have no data points
count==0will be omitted from the returned table
Memory efficient queries
For long time ranges with small window widths (many statistical summaries), using the _iter methods is significantly more memory efficient.
These methods return data in batches, which are guaranteed to be returned in sorted order, allowing you to query much larger time ranges.
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
The size of statistical windows to return, will contain summaries of the data between
TYPE:
|
|
What is the maximum tradeoff in computation of the statistical summaries by how far we need to walk down the tree, by default 0 is the most accurate, and a range of 0->63 can be used based on the time range of data.
TYPE:
|
|
What version of the stream to query against, using the default of |
|
What
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncGenerator[RecordBatch, None]
|
The statistical summary information of the stream as a timeseries. |
raw_values
async
¶
raw_values(
start: int | datetime,
end: int | datetime,
version: Optional[int] = None,
schema: Optional[Schema] = TIME_VALUE_F32_SCHEMA,
) -> Table
Return the raw time,value pairs of data from the stream between start and end.
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
What version of the stream to query against, using the default of |
|
What
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Table
|
A table of timeseries data in the interval of [start, end) |
raw_values_iter
async
¶
raw_values_iter(
start: int | datetime,
end: int | datetime,
version: Optional[int] = None,
schema: Optional[Schema] = TIME_VALUE_F32_SCHEMA,
) -> AsyncGenerator[RecordBatch, None]
Return the raw timeseries data from the stream between start and end, but as an iterator instead of buffering all at once.
Memory efficient queries
If you are working with a lot of raw data and can afford to do processing in batches (which are guaranteed to return in sorted order),
the _iter based methods are much more memory efficient and allow you to query much larger ranges of time.
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
What version of the stream to query against, using the default of |
|
What
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncGenerator[RecordBatch, None]
|
Tables of timeseries data from the stream in the interval of [start, end) |
Examples:
Query for a large range of data and process in batches.
>>> value_generator = stream.raw_values_iter(start, end)
>>> value_counter = 0
>>> async for batch in value_generator:
>>> value_counter += batch.num_rows
>>> print(f"Processed {value_counter} rows")
refresh_metadata
async
¶
refresh_metadata() -> None
Refreshes the locally cached metadata for a stream from the server.
Queries the BTrDB server for all stream metadata including collection, annotation, and tags.
set_retention
async
¶
set_retention(
remove_older_than: Optional[datetime] = None,
) -> None
Set retention policy on a stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Trim time period after which the data can be removed. Specifying
|
Examples:
Keep the data for only two hours.
>>> conn = pt.connect()
>>> s = conn.stream_from_uuid('96858e5d-626c-11f0-9bfd-d49390044950')
>>> s.set_retention(datetime.timedelta(hours=2))
tags
async
¶
Return the tag-based metadata of the stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Should the tag dictionary be copied or passed by reference, by default True
TYPE:
|
|
Do we need to query the platform to get the metadata first, by default True |
| RETURNS | DESCRIPTION |
|---|---|
dict[str, str]
|
A mapping of tag metadata key,value pairs. |
update
async
¶
update(
collection: Optional[str] = None,
tags: Optional[dict[str, str]] = None,
annotations: Optional[dict[str, str]] = None,
replace_tags: Optional[bool] = False,
replace_annotations: Optional[bool] = False,
) -> int
Update the stream metadata.
| PARAMETER | DESCRIPTION |
|---|---|
|
Change the collection the stream is located under, by default None |
|
Update any tag metadata of the stream, by default None |
|
Update any non-tag metadata of the stream, by default None |
|
If you want to fully replace the current stream |
|
If you want to fully replace the current stream |