Skip to content

AsyncClient

pingthings.timeseries.async_client.AsyncClient

AsyncClient(
    creds: dict[str, str],
    pyo3_client: Client,
    concurrency_limit: Optional[int] = None,
)

Asynchronous representation of the client connection to the timeseries platform.

Advanced user feature

For most customers, using the synchronous client (which will leverage the asynchronous client under the hood) will be sufficient. If you need to leverage the asynchronous functions yourself, this client can be used.

METHOD DESCRIPTION
connect

Connect to the timeseries platform and return an asynchronous client.

create

Create a new stream.

get_collection_properties

Get properties of a collection.

get_device

Returns a device with a matching id in the device table.

get_unit

Returns a unit with a matching id in the unit table.

info

Retrieve information about the server and proxy server the client is connected to.

list_collections

Returns a list of collection paths using the prefix argument for

list_devices

Returns a list of devices the user has authorization to view.

list_units

Returns the list of units in the system.

set_collection_retention

Set retention policy on a collection of streams.

sql_query

Performs a SQL query on the database metadata and returns a list of

stream_from_uuid

Retrieve a stream based on its UUID.

streams_in_collection

Search for streams matching given parameters

streamset_from_uuids

Return an AsyncStreamSet from an iterable of UUIDs.

ATTRIBUTE DESCRIPTION
concurrency_limit

The concurrency limit for background asynchronous operations.

Attributes

concurrency_limit property writable

concurrency_limit

The concurrency limit for background asynchronous operations.

Setting the value overrides the environment variable

If you choose to set a custom concurrency_limit, this will bypass the environment variable PINGTHINGS_CONCURRENCY_LIMIT value.

Functions

connect async staticmethod

connect(
    profile: Optional[str] = None,
    endpoint: Optional[str] = None,
    apikey: Optional[str] = None,
    concurrency_limit: Optional[int] = None,
) -> AsyncClient

Connect to the timeseries platform and return an asynchronous client.

Advanced user feature

For most customers, using the synchronous client (which will leverage the asynchronous client under the hood) will be sufficient. If you need to leverage the asynchronous functions yourself, this function can be useful.

Connecting to the platform for commercial customers

If you are a commercial customer and are using the PingThings JupyterHub/Lab environment to work with the timeseries platform, the relevant connection information has already been added to your session in the form of environment variables. All you need to do to connect to the platform is the following:

import pingthings as pt
async_conn = await pt.timeseries.AsyncClient.connect()

Choosing a concurrency limit

The default concurrency limit is defined in pingthings.timeseries.constants. If the environment variable is not set, it will use this default value. Using a large concurrency limit has the ability to cause large memory consumption, and if the job gets killed, it can also lead to a hard to kill zombie process. Most users will probably be fine with the default concurrency limit, but feel free to explore larger values.

PARAMETER DESCRIPTION
profile

The name of a profile containing the required connection information as found in the user's predictive grid credentials file ${HOME}/.predictivegrid/credentials.yaml.

TYPE: Optional[str] DEFAULT: None

endpoint

The address and port of the cluster to connect to, e.g. 192.168.1.1:4411, if not set, will look for the environment variable $BTRDB_ENDPOINTS

TYPE: Optional[str] DEFAULT: None

apikey

The API key used to authenticate requests, if not set, the key is looked up from the environment variable $BTRDB_API_KEY.

TYPE: Optional[str] DEFAULT: None

concurrency_limit

The maximum number of concurrent database requests to have in flight at any one time, if not set, will be inferred from environment variable $PINGTHINGS_CONCURRENCY_LIMIT.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
AsyncClient

A timeseries client.

Examples:

Connecting to the timeseries platform as a commercial customer in the PingThings provided JupyterHub/Lab environment This behavior also works if you have the environment variables set, refer to the above docstring for more information.

>>> import pingthings as pt
>>> conn = await pt.timeseries.AsyncClient.connect()

Connecting to the timeseries platform when you know your api key and FQDN endpoint.

>>> import pingthings as pt
>>> my_key = "ABC123"
>>> my_endpoint = "example.com:4411"
>>> conn = await pt.timeseries.AsyncClient.connect(apikey=my_key, endpoint=my_endpoint)

Connecting to the platform when you have a populated ${HOME}/.predictivegrid/credentials.yaml file with profiles.

>>> import pingthings as pt
>>> conn = await pt.timeseries.AsyncClient.connect(profile='my_server')

create async

create(
    uuid: UUID,
    collection: str,
    tags: dict[str, str] = {},
    annotations: dict[str, str] = {},
) -> AsyncStream

Create a new stream.

PARAMETER DESCRIPTION
uuid

The UUID for the new stream.

TYPE: UUID

collection

The collection to which the stream belongs.

TYPE: str

tags

Tags associated with the stream.

TYPE: dict[str, str] DEFAULT: {}

annotations

Annotations for the stream.

TYPE: dict[str, str] DEFAULT: {}

RETURNS DESCRIPTION
AsyncStream

The newly created stream.

get_collection_properties async

get_collection_properties(
    collection: str,
) -> dict[str, Any]

Get properties of a collection.

PARAMETER DESCRIPTION
collection

The name of the collection.

TYPE: str

Examples:

Get the retention policy of a collection.

>>> conn.get_collection_properties("bar")
{'retention': {'remove_older_than': datetime.timedelta(days=7)}}

get_device async

get_device(device_id: int) -> dict[str, Any]

Returns a device with a matching id in the device table.

PARAMETER DESCRIPTION
device_id

Integer corresponding to the id of the device in the database.

TYPE: int

Returns: Dictionary with fields corresponding to the values in the database.

get_unit async

get_unit(unit_id: int) -> dict[str, Any]

Returns a unit with a matching id in the unit table.

PARAMETER DESCRIPTION
unit_id

Integer corresponding to the id of the unit in the database.

TYPE: int

Returns: Dictionary with fields corresponding to the values in the database.

info async

info() -> dict[Any, Any]

Retrieve information about the server and proxy server the client is connected to.

RETURNS DESCRIPTION
dict[Any, Any]

A dictionary containing server and proxy server information.

list_collections async

list_collections(prefix: Optional[str] = None) -> list[str]

Returns a list of collection paths using the prefix argument for filtering.

PARAMETER DESCRIPTION
prefix

Filter collections that start with the string provided, if none passed, will list all collections.

DEFAULT: None

RETURNS DESCRIPTION
list[str]

All collections that match the provided prefix.

Examples:

Assuming we have the following collections in the platform: foo, bar, foo/baz, bar/baz

>>> conn = pt.connect()
>>> conn.list_collections().sort()
["bar", "bar/baz", "foo", "foo/bar"]
>>> conn.list_collections(prefix="foo")
["foo", "foo/bar"]
>>> conn.list_collections(prefix="moo")
[]

list_devices async

list_devices() -> list[dict[str, Any]]

Returns a list of devices the user has authorization to view.

RETURNS DESCRIPTION
list[dict[str, Any]]

List of dictionaries with each dictionary corresponding to a single device.

list_units async

list_units() -> list[dict[str, Any]]

Returns the list of units in the system.

RETURNS DESCRIPTION
list[dict[str, Any]]

List of dictionaries with each dictionary corresponding to a single unit.

set_collection_retention async

set_collection_retention(
    collection: str,
    override_per_stream=False,
    remove_older_than: Optional[datetime] = None,
) -> None

Set retention policy on a collection of streams.

PARAMETER DESCRIPTION
collection

The name of the collection.

override_per_stream

Whether stream-specific retention policy should be overridden.

DEFAULT: False

remove_older_than

Trim time period - after which the data will get removed. Not specifying this parameter disables the trimming.

TYPE: Optional[datetime] DEFAULT: None

Examples:

Keep the data for only one week.

>>> conn = pt.connect()
>>> conn.list_collections()
["bar", "foo"]
>>> conn.set_collection_retention("bar", false, datetime.timedelta(days=7))

sql_query async

sql_query(
    query: str, params: Optional[list[str]] = None
) -> list[Any]

Performs a SQL query on the database metadata and returns a list of dictionaries from the resulting cursor.

PARAMETER DESCRIPTION
query

A SQL statement to be executed on the BTrDB metadata. Available columns in the stream table are noted below. To sanitize inputs use a $1 style parameter such as select * from streams where name = $1 or name = $2.

TYPE: str

params

A list of parameter values to be sanitized and interpolated into the SQL statement. Using parameters forces value/type checking and is considered a best practice at the very least.

TYPE: Optional[list[str]] DEFAULT: None

RETURNS DESCRIPTION
list[Any]

The result of the SQL query.

Available columns in the stream table

column_name data_type
uuid uuid
collection character varying
name character varying
unit character varying
ingress character varying
property_version bigint
annotations hstore
distiller character varying
created_at timestamp with time zone
updated_at timestamp with time zone
geo postgis geometry
last_written timestamp with time zone
previous_last_written timestamp with time zone
estimated_count_delta bigint
long_term_autoregressive_average double precision
count_last_updated timestamp with time zone
previous_count_last_updated timestamp with time zone
watched boolean

stream_from_uuid

stream_from_uuid(uuid: UUID | str) -> AsyncStream

Retrieve a stream based on its UUID.

PARAMETER DESCRIPTION
uuid

The UUID of the stream.

TYPE: UUID | str

RETURNS DESCRIPTION
AsyncStream

The stream associated with the provided UUID.

RAISES DESCRIPTION
TypeError

If the provided uuid is not a valid UUID

streams_in_collection async

streams_in_collection(
    collection: str = "",
    is_collection_prefix: bool = True,
    tags: Optional[dict[str, str]] = None,
    annotations: Optional[dict[str, str]] = None,
) -> AsyncStreamSet

Search for streams matching given parameters

PARAMETER DESCRIPTION
collection

collections to use when searching for streams, case sensitive.

TYPE: str DEFAULT: ''

is_collection_prefix

Whether the collection is a prefix of the whole collection name.

TYPE: bool DEFAULT: True

tags

The tags to identify the stream.

TYPE: Optional[dict[str, str]] DEFAULT: None

annotations

The annotations to identify the stream.

TYPE: Optional[dict[str, str]] DEFAULT: None

RETURNS DESCRIPTION
AsyncStreamSet

The grouping of streams matching given parameters.

streamset_from_uuids async

streamset_from_uuids(
    uuids: list[UUID | str], fetch_metadata: bool = True
) -> AsyncStreamSet

Return an AsyncStreamSet from an iterable of UUIDs.

PARAMETER DESCRIPTION
uuids

List of stream identifiers

TYPE: list[UUID | str]

fetch_metadata

Whether to fetch metadata for the streams in the set. Default is True.

TYPE: bool DEFAULT: True

Advanced user feature

Be cautious about using fetch_metadata=False. Many stream metadata values like collection, name, unit, tags, annotations will not be available, meaning filtering and other operations that require metadata will not work.

RETURNS DESCRIPTION
AsyncStreamSet

The AsyncStreamSet associated with the provided iterable of UUIDs.