Stream
pingthings.timeseries.client.Stream
¶
Stream(
conn: Client,
stream: Optional[AsyncStream] = None,
uuid: Optional[UUID] = None,
known_to_exist: bool = False,
collection: Optional[str] = None,
tags: Optional[dict[str, str]] = None,
annotations: Optional[dict[str, str]] = None,
property_version: int = 0,
)
The base representation of a timeseries signal in the platform.
| PARAMETER | DESCRIPTION |
|---|---|
|
An instance of the client connection to the platform database.
TYPE:
|
|
Asynchronous representation of this timeseries signal, by default None
TYPE:
|
|
Unique identifier of the timeseries signal, by default None |
|
If this stream is known to already exist, can ignore some roundtrips to the database, by default False
TYPE:
|
|
The collection string that the signal belongs to, by default None |
|
Tag-based metadata of the signal, by default None |
|
Non-tag metadata of the signal, by default None |
|
Internal version of the signal, by default 0
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
The timeseries representation. |
| METHOD | DESCRIPTION |
|---|---|
aligned_windows_iter |
Return statistical summary timeseries data aligned with the internal tree structure of the database. |
annotations |
Return the metadata that is not present in the tag-metadata of the stream. |
changes |
Return a list of intervals of time that have changed between two versions. |
changes_iter |
Return the intervals of time that have changed between two versions as a generator. |
count |
Get the total count of raw measurements that are present in the stream. |
delete |
"Delete" all points between [ |
earliest |
Find the earliest point (in time) that is present in the stream at |
flush |
Force a flush of the buffered data to persistent storage. |
get_latest_version |
Get the current version of the stream. |
get_retention |
Get the retention policy of the stream. |
get_version_at_time |
Return the version of the stream at the time provided. |
insert |
Add new timeseries data to the stream. |
latest |
Find the latest point (in time) that is present in the stream at |
nearest |
Get the nearest datapoint at |
obliterate |
Completely remove the stream from the platform. |
pin_version |
Set the version of the stream to a specific version number, useful when wanting reproducible data queries. |
pinned_version |
Return the pinned version of the stream. |
precise_windows_iter |
Return custom-sized statistical summaries of the data. |
raw_values |
Return the raw time,value pairs of data from the stream between |
raw_values_iter |
Return the raw timeseries data from the stream between |
refresh_metadata |
Retrieve and update all metadata for this stream. |
set_retention |
Set retention policy on a stream. |
tags |
Return the tag-based metadata of the stream. |
update |
Update the stream metadata. |
windowed_values |
Return statistical summaries of the data. |
| ATTRIBUTE | DESCRIPTION |
|---|---|
collection |
Return the collection the stream belongs to.
TYPE:
|
name |
Return the name of the stream.
TYPE:
|
uuid |
Return the unique identifier of the stream.
TYPE:
|
Attributes¶
collection
property
¶
collection: str | None
Return the collection the stream belongs to.
| RETURNS | DESCRIPTION |
|---|---|
str | None
|
The collection of the stream. |
name
property
¶
name: str
Return the name of the stream.
| RETURNS | DESCRIPTION |
|---|---|
str
|
The name of the stream. |
uuid
property
¶
uuid: UUID
Return the unique identifier of the stream.
| RETURNS | DESCRIPTION |
|---|---|
UUID
|
The unique identifier of the stream. |
Functions¶
aligned_windows_iter
¶
aligned_windows_iter(
start: int | datetime | Timestamp,
end: int | datetime | Timestamp,
point_width: Optional[int] = None,
width: Optional[int | timedelta | Timedelta] = None,
version: Optional[int] = None,
schema: Optional[Schema] = STAT_F32_SCHEMA,
) -> Generator[Table, None, None]
Return statistical summary timeseries data aligned with the internal tree structure of the database.
Query BTrDB for aggregates (or roll ups or windows) of the time series with version between time start (inclusive) and end (exclusive) in nanoseconds [start, end).
Each point returned is a statistical aggregate of all the raw data within a window of width 2**pointwidth nanoseconds.
These statistical aggregates currently include the mean, minimum, maximum, count, and standard deviation of the data composing the window.
Understanding aligned_windows queries
startis inclusive, butendis exclusive. Results will be returned for all windows that start in the interval \([start, end)\).- If \(end < start + 2^{pointwidth}\), you will not get any results.
- If
startandendare not powers of two, the bottompointwidthbits will be cleared, aligning them to the nearest multiple of \(2^{pointwidth}\). For example, if you query data between[31, 121)withpointwidth = 4, the actual query will be performed on[16, 112]. - Each window will contain statistical summaries of the window.
- Statistical points with
count == 0will be omitted.
Memory efficient queries
For long time ranges with small window widths (many statistical summaries), using the _iter methods is significantly more memory efficient.
These methods return data in batches, which are guaranteed to be returned in sorted order, allowing you to query much larger time ranges.
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
What size statistical windows to return, aligned to a size of the internal tree. Value will be interpreted as \(2^{point\_width}\). |
|
The size of statistical windows to return, will contain summaries of the data between |
|
What version of the stream to query against, using the default of |
|
What
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Table
|
Statistical summary timeseries of the stream between \([start, end)\) |
annotations
¶
Return the metadata that is not present in the tag-metadata of the stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Should the returned dict be copied or passed by reference, by default True
TYPE:
|
|
Do we need to get the latest metadata from the platform first, by default True |
| RETURNS | DESCRIPTION |
|---|---|
Optional[dict[str, str]]
|
A mapping of annotation metadata key,value pairs. |
changes
¶
changes(
from_version: int, to_version: int, resolution: int = 0
) -> list[dict[str, int] | None]
Return a list of intervals of time that have changed between two versions.
Unlike the _iter version, this method buffers and returns the changed intervals
in a single list.
Advanced user feature
This method can have unexpected return values if you are not familiar with the underlying timeseries database that is part of the PredictiveGrid platform.
Time ranges can overlap with previously inserted data
The returned timeranges will not be exact start and end times for the data inserted. Instead, they will be aligned as closely as they can be, but also in aligment with the underlying database's tree structure.
| PARAMETER | DESCRIPTION |
|---|---|
|
The stream version to compare from (exclusive).
TYPE:
|
|
The stream version to compare to (inclusive).
TYPE:
|
|
The "coarseness" of change detection in tree width. A resolution of
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
list[dict[str, int] | None]
|
list[dict[str, int] | None]:
A list of dictionaries representing changed intervals. Typically each dictionary
looks like: |
Examples:
>>> # Suppose we want to iterate over all changes between version 9 and 42
>>> for changed_range in stream.changes(from_version=9, to_version=42, resolution=0):
... print("Changed interval:", changed_range)
>>> # Suppose we want to iterate over all changes between version 9 and 42 and get the lastest data that has not been inserted yet
>>> for changed_range in stream.changes(from_version=9, to_version=0, resolution=0):
... print("Changed interval:", changed_range)
changes_iter
¶
changes_iter(
from_version: int, to_version: int, resolution: int = 0
) -> Generator[dict[str, int], None, None]
Return the intervals of time that have changed between two versions as a generator.
Memory efficient queries
This method yields each changed interval one at a time. If you need to process
many intervals without storing them all in memory at once, this _iter method is
more memory efficient than its non-iter counterpart.
Advanced user feature
This method can have unexpected return values if you are not familiar with the underlying timeseries database that is part of the PredictiveGrid platform.
Time ranges can overlap with previously inserted data
The returned timeranges will not be exact start and end times for the data inserted. Instead, they will be aligned as closely as they can be, but also in aligment with the underlying database.
| PARAMETER | DESCRIPTION |
|---|---|
|
The stream version to compare from.
TYPE:
|
|
The stream version to compare to.
TYPE:
|
|
The "coarseness" of change detection in tree width. A resolution of
DEFAULT:
|
| YIELDS | DESCRIPTION |
|---|---|
dict[str, int]
|
dict[str, int]:
A dictionary representing a changed interval, with a structure like:
|
Examples:
>>> # Suppose we want to iterate over all changes between version 9 and 42
>>> for changed_range in stream.changes_iter(from_version=9, to_version=42, resolution=0):
... print("Changed interval:", changed_range)
>>> # Suppose we want to iterate over all changes between version 9 and 42 and get the lastest data that has not been inserted yet
>>> for changed_range in stream.changes_iter(from_version=9, to_version=0, resolution=0):
... print("Changed interval:", changed_range)
count
¶
count(
start: Optional[
int | datetime | Timestamp
] = MINIMUM_TIME,
end: Optional[
int | datetime | Timestamp
] = MAXIMUM_TIME,
version: Optional[int] = None,
precise: Optional[bool] = False,
) -> int
Get the total count of raw measurements that are present in the stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Bound the lower end of this query by a start time, by default MINIMUM_TIME.
TYPE:
|
|
Bound the upper end of this query by an end time, by default MAXIMUM_TIME
TYPE:
|
|
Version of the stream to query against, by default None, which means a version of 0 is used. |
|
Do we need an exact count or is an estimate reasonable, by default False |
| RETURNS | DESCRIPTION |
|---|---|
int
|
Count of points in the stream. |
delete
¶
"Delete" all points between [start, end)
"Delete" all points between start (inclusive) and end (exclusive),
both in nanoseconds.
This is a soft delete
The PingThings timeseries data structure has persistent multiversioning. This means that the deleted points will still exist as part of an older version of the stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Time to begin deleting points in the stream (inclusive).
TYPE:
|
|
Time to stop deleting points in the stream (exclusive).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
The updated version number of the stream. |
earliest
¶
flush
¶
flush() -> int
Force a flush of the buffered data to persistent storage.
If data was present, the version number will be positively incremented.
| RETURNS | DESCRIPTION |
|---|---|
int
|
The major version of the stream after the flush. |
get_latest_version
¶
get_latest_version() -> int
Get the current version of the stream.
| RETURNS | DESCRIPTION |
|---|---|
int
|
The version number of the stream. |
get_retention
¶
Get the retention policy of the stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Should the returned dict be copied or passed by reference, by default True
TYPE:
|
|
Do we need to get the latest metadata from the platform first, by default True |
| RETURNS | DESCRIPTION |
|---|---|
Optional[dict[str, Any]]
|
Retention policy. |
Examples:
>>> stream_a.get_retention()
{}
>>> stream_b.get_retention()
{'remove_older_than': datetime.timedelta(hours=2)}
get_version_at_time
¶
insert
¶
insert(
data: Table | RecordBatch,
merge_policy: MergePolicy = "never",
) -> int
Add new timeseries data to the stream.
Default merge policy has changed!
Starting with the new pingthings api, the default merge policy is now replace.
Please refer to the merge policy docs
Data must follow a specific schema for insertion
Your data pyarrow table or record batch must have a schema
that matches the TIME_VALUE_F64_SCHEMA, defined. Order of the columns matter.
| PARAMETER | DESCRIPTION |
|---|---|
|
A pyarrow table or record batch of time series data.
TYPE:
|
|
How should the database handle data with the same timestamp, by default "never"
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
int
|
The version of the stream after data insertion, this number can increment multiple times depending on how many points are inserted. |
latest
¶
nearest
¶
nearest(
time: int | datetime | Timestamp,
backward: bool = False,
version: Optional[int] = None,
) -> Optional[Point]
Get the nearest datapoint at time with version.
| PARAMETER | DESCRIPTION |
|---|---|
|
The time to query for the nearest point. |
|
Can the query find the closest time that is before
TYPE:
|
|
Version of the stream to query against, by default None, which means version 0 will be used. |
| RETURNS | DESCRIPTION |
|---|---|
Optional[Point]
|
The time,value Point that is closest to the time in question, if available. |
obliterate
¶
obliterate() -> None
Completely remove the stream from the platform.
Will delete data!
Obliterating the stream will remove the stream, as well as all of its data!
This stream will no longer be accessible, make sure you are completely sure you want to do this.
If you have Admin privileges, you can obliterate streams!
pin_version
¶
Set the version of the stream to a specific version number, useful when wanting reproducible data queries.
Default behavior is to pin the stream to the latest version number when this method is executed.
Useful version number
If you do not want to pin the stream to a version and want to instead always use the latest version of the stream, which
includes data that is being streamed into the platform, use a version number of 0. This is a "magic" value which
tells the platform to always use the latest data it can find.
| PARAMETER | DESCRIPTION |
|---|---|
|
Version number to pin the stream to, by default None, which will pin the stream to its latest version, as returned by |
| RETURNS | DESCRIPTION |
|---|---|
int
|
The version pinned. |
pinned_version
¶
pinned_version() -> int
Return the pinned version of the stream.
| RETURNS | DESCRIPTION |
|---|---|
int
|
The version of the stream it is pinned to. |
precise_windows_iter
¶
precise_windows_iter(
start: int | datetime | Timestamp,
end: int | datetime | Timestamp,
width: int | timedelta | Timedelta,
depth: int = 0,
version: Optional[int] = None,
schema: Optional[Schema] = STAT_F32_SCHEMA,
) -> Generator[Table, None, None]
Return custom-sized statistical summaries of the data.
Understanding windows queries
windowsreturns arbitrary precision statistical summary windows from the platform. It is slower thanaligned_windows, but can be significantly faster than raw value queries (raw_values).- Each returned window will be
widthnanoseconds long. startis inclusive, butendis exclusive (e.g., ifend < start + widthyou will get no results).- Results will be returned for all windows that start at a time less than the
endtimestamp. - If (
end-start) is not a multiple ofwidth, thenendwill be decreased to the greatest value less thanendsuch that (end-start) is a multiple ofwidth(i.e., we setend = start + width * floordiv(end - start, width)). - Windows that have no data points
count==0will be omitted from the returned table
Memory efficient queries
For long time ranges with small window widths (many statistical summaries), using the _iter methods is significantly more memory efficient.
These methods return data in batches, which are guaranteed to be returned in sorted order, allowing you to query much larger time ranges.
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
The size of statistical windows to return, will contain summaries of the data between |
|
What is the maximum tradeoff in computation of the statistical summaries by how far we need to walk down the tree, by default 0 is the most accurate, and a range of 0->63 can be used based on the time range of data.
TYPE:
|
|
What version of the stream to query against, using the default of |
|
What
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Table
|
The statistical summary information of the stream as a timeseries. |
raw_values
¶
raw_values(
start: int | datetime | Timestamp,
end: int | datetime | Timestamp,
version: Optional[int] = None,
schema: Optional[Schema] = TIME_VALUE_F32_SCHEMA,
) -> Table
Return the raw time,value pairs of data from the stream between start and end.
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
What version of the stream to query against, using the default of |
|
What
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Table
|
A table of timeseries data in the interval of [start, end) |
raw_values_iter
¶
raw_values_iter(
start: int | datetime | Timestamp,
end: int | datetime | Timestamp,
version: Optional[int] = None,
schema: Optional[Schema] = TIME_VALUE_F32_SCHEMA,
) -> Generator[Table, None, None]
Return the raw timeseries data from the stream between start and end, but as an iterator instead of buffering all at once.
Memory efficient queries
If you are working with a lot of raw data and can afford to do processing in batches (which are guaranteed to return in sorted order),
the _iter based methods are much more memory efficient and allow you to query much larger ranges of time.
| PARAMETER | DESCRIPTION |
|---|---|
|
Start time to get data (inclusive) |
|
End time for the data query (exclusive) |
|
What version of the stream to query against, using the default of |
|
What
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Table
|
Tables of timeseries data from the stream in the interval of [start, end) |
Examples:
Query for a large range of data and process in batches.
>>> value_generator = stream.raw_values_iter(start, end)
>>> value_counter = 0
>>> for batch in value_generator:
>>> value_counter += batch.num_rows
>>> print(f"Processed {value_counter} rows")
refresh_metadata
¶
refresh_metadata() -> None
Retrieve and update all metadata for this stream.
This will update the tags and annotations for the stream as
well as any other metadata that might not be set during manual
instantiation of the Stream object
set_retention
¶
set_retention(
remove_older_than: Optional[datetime] = None,
) -> None
Set retention policy on a stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Trim time period after which the data can be removed. Specifying
|
Examples:
Keep the data for only two hours.
>>> stream.set_retention(datetime.timedelta(hours=2))
tags
¶
Return the tag-based metadata of the stream.
| PARAMETER | DESCRIPTION |
|---|---|
|
Should the tag dictionary be copied or passed by reference, by default True
TYPE:
|
|
Do we need to query the platform to get the metadata first, by default True |
| RETURNS | DESCRIPTION |
|---|---|
dict[str, str]
|
A mapping of tag metadata key,value pairs. |
update
¶
update(
collection: Optional[str] = None,
tags: Optional[dict[str, str]] = None,
annotations: Optional[dict[str, str]] = None,
replace_tags: Optional[bool] = False,
replace_annotations: Optional[bool] = False,
)
Update the stream metadata.
| PARAMETER | DESCRIPTION |
|---|---|
|
Change the collection the stream is located under, by default None |
|
Update any tag metadata of the stream, by default None |
|
Update any non-tag metadata of the stream, by default None |
|
If you want to fully replace the current stream |
|
If you want to fully replace the current stream |
windowed_values
¶
windowed_values(
start: int | datetime | Timestamp,
end: int | datetime | Timestamp,
width: int | timedelta | Timedelta | _PW,
precise: Optional[bool] = False,
version: Optional[int] = None,
schema: Optional[Schema] = STAT_F32_SCHEMA,
) -> Table
Return statistical summaries of the data.
| PARAMETER | DESCRIPTION |
|---|---|
|
The approximate start time to get data (inclusive). See notes. |
|
The approximate end time for the data query (exclusive). See notes. |
|
The approximate size of statistical windows to return, will contain summaries of the data between |
|
Pass in |
|
What version of the stream to query against, using the default of |
|
What
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Table
|
The statistical summary information of the stream as a timeseries. |
Notes
By default (precise=False), the values provided to start, end and width may not fully align with
the values actually used by the query. Instead, the window width used will be the largest power of 2 ns
that is smaller than the provided window. Doing so aligns the query with BTrDB's internal tree structure
and thus increases query performance by several orders of magnitude.
Consequently, the actual time-range of the data pulled will be the aligned values that fall within the range \([start, end)\).