Streaming checklists in Nominal with Python

Overview

Nominal’s streaming checklists allow you to continuously monitor real-time data from a connection and trigger alerts whenever certain conditions are met. This guide walks you through the process of setting up and running a streaming checklist in Python using the Nominal client.

To run a streaming checklist and receive notifications, we will create/select the following in the next steps:

  • A Connection.
  • A Run, needed to create a Checklist in the web GUI. It is used as a preview for the checklist.
  • An Asset.
  • A Checklist.
  • Stream some data to trigger notifications.

Prerequisites

Make sure you have the nominal Python packages installed. You can install it using:

1pip3 install nominal

Connect to Nominal

Concepts
  • Base URL: The URL through which the Nominal API is accessed (typically https://api.gov.nominal.io/api; shown under Settings → API keys).
  • Workspace: A mechanism by which to isolate datasets; each user has one or more workspace, and data in one cannot be seen from another. Note that a token / API key is attached to a user, and may access multiple workspaces.
  • Profile: A combination of base URL, API key, and workspace.

There are two primary ways of authenticating the Nominal Client. The first is to use a profile stored on disk, and the second is to use a token directly.

Run the following in a terminal and follow on-screen prompts to set up a connection profile:

$$ nom config profile add default
>
># Alternatively, if `nom` is missing from the path:
>$ python -m nominal config profile add default

Here, “default” can be any name chosen to represent this profile (reminder: a profile represents a base URL, API key, and workspace).

The profile will be stored in ~/.config/nominal/config.yml, and can then be used to create a client:

1from nominal.core import NominalClient
2
3client = NominalClient.from_profile("default")
4
5# Get details about the currently logged-in user to validate authentication
6# Will display an object like: `User(display_name='your_email@your_company.com', ...)`
7print(client.get_user())

If you have previously used nom to store credentials, prior to the availability of profiles, you will need to migrate your old configuration file (~/.nominal.yml) to the new format (~/.config/nominal/config.yml).

You can do this with the following command:

$nom config migrate
>
># Or, if `nom` is missing from your path:
>python -m nominal config migrate
1from nominal.core import NominalClient
2
3# Get an instance of the client using provided credentials
4client = NominalClient.from_token("<insert api key>")
5
6# Get details about the currently logged-in user to validate authentication
7# Will display an object like: `User(display_name='your_email@your_company.com', ...)`
8print(client.get_user())

NOTE: you should never share your Nominal API key with anyone. We therefore recommend that you not save it in your code and/or scripts.

  • If you trust the computer you are on, use nom to store the credential to disk.
  • Otherwise, use a password manager such as 1password or bitwarden to keep your token safe.
If you’re not sure whether your company has a Nominal tenant, please reach out to us.

Create a Connection to Nominal’s Internal Time Series Database

You only need to run the code below to create a connection once!

Moreover, you may not even need to create a new connection if you or a colleague has already set one up. Check the connections page for existing connections where Type = nominal and Status = Connected.

To stream data to a Connection, first create a connection to Nominal’s internal time series database. The datasource_id must be unique within your organization, and connection_name is a friendly name that will appear on the Nominal platform under all connections.

1connection = nm.create_streaming_connection(
2 datasource_id="my-unique-datasource-id",
3 connection_name="my-connection-name"
4)

You can access this connection later using its rid. You can find the rid in connection.rid of the code above or look it up on the Nominal platform in the “Connections” section and use it in the nm.get_connection() function

Create a Run and Stream Test Data

Here, we’ll create a temporary run, associate it with our connection, and then stream some initial data.

1from datetime import datetime, timedelta
2
3values = [1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] * 3
4interval = timedelta(seconds=2)
5start = datetime.now() - len(values) * interval
6timestamps = [start + i * interval for i in range(len(values))]
7
8temp_run = nm.create_run(
9 name = 'temporary_run',
10 start = start,
11 end = start + timedelta(hours=1),
12)
13temp_run.add_connection("my_ref", connection)
14
15with connection.get_nominal_write_stream(batch_size=len(values)) as stream:
16 stream.enqueue_batch(channel_name="my_channel_name", timestamps=timestamps, values=values)

Create an Asset

1asset = nm.create_asset(
2 name = 'streaming_checklist_asset'
3)
4asset.add_connection("my_ref", connection)

Create the Checklist in the Nominal Web UI

Checklist preview

  1. Go to the Checklist Page.
  2. Click on New checklist.
  3. Select the ‘temporary_run’ you created above.
  4. Enter a name for the first check and a priority and click Add check
  5. Define a check:
    • In the field next to when, select my_channel_name.
    • in the next field select >.
  6. you should now see the violations in the chart. Zoom in if necessary.
  7. click on Publish in the top-right corner.
  8. From the dropdown next to the checklist name (), copy the rid of the checklist.
  9. Paste the rid in the code below.
1checklist_rid = "paste_rid_here"

Select an Integration for Notifications

A streaming checklist needs an integration to send alerts to. You can find available integrations in the Integrations Page copy the rid and paste it in the code below.

1integration_rid = "paste_rid_here"

integrations

Execute the Streaming Checklist

With the checklist and asset ready, and an integration selected, you can now execute the streaming checklist:

1checklist = nm.get_checklist(checklist_rid)
2
3checklist.execute_streaming(
4 assets=[asset],
5 integration_rids=[integration_rid],
6)

Monitoring and Managing Streaming Checklists

  • List running streaming checklists:
1list(nm.list_streaming_checklists())
  • Stop a running streaming checklist:
1checklist.stop_streaming()

Stream More Data to Trigger Notifications

Now that your streaming checklist is active, send more data to trigger notifications. The following code continuously sends data points every 2 seconds. When conditions are met, notifications will be sent to the configured integration.

1from datetime import datetime, timedelta
2from time import sleep
3
4from datetime import datetime, timedelta
5from time import sleep
6
7values = [1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
8interval = timedelta(seconds=2)
9
10with connection.get_nominal_write_stream(batch_size=1) as stream:
11 for value in values * 100:
12 now = datetime.now()
13 print(f"Sending data point at {now}: {value}")
14 stream.enqueue(channel_name="my_channel_name", timestamp=now, value=value)
15 sleep(2)

Example Alert (Slack): You should now receive real-time notifications through your chosen integration. For Slack, the messages might look like this: slack integration