Streaming checklists in Nominal with Python

Overview

Nominal’s streaming checklists allow you to continuously monitor real-time data from a connection and trigger alerts whenever certain conditions are met. This guide walks you through the process of setting up and running a streaming checklist in Python using the Nominal client.

To run a streaming checklist and receive notifications, we will create/select the following in the next steps:

  • A Connection.
  • A Run, needed to create a Checklist in the web GUI. It is used as a preview for the checklist.
  • An Asset.
  • A Checklist.
  • Stream some data to trigger notifications.

Prerequisites

Make sure you have the nominal Python packages installed. You can install it using:

1pip3 install nominal

Connect to Nominal

Get your Nominal API token from your User settings page.

See the Quickstart for more details on connecting to Nominal from Python.

1import nominal.nominal as nm
2
3nm._config.set_token(
4 url = 'https://api.gov.nominal.io/api',
5 token = '* * *' # Replace with your Access Token from
6 # https://app.gov.nominal.io/settings/user?tab=tokens
7)
If you’re not sure whether your company has a Nominal tenant, please reach out to us.

Create a Connection to Nominal’s Internal Time Series Database

You only need to run the code below to create a connection once!

Moreover, you may not even need to create a new connection if you or a colleague has already set one up. Check the connections page for existing connections where Type = nominal and Status = Connected.

To stream data to a Connection, first create a connection to Nominal’s internal time series database. The datasource_id must be unique within your organization, and connection_name is a friendly name that will appear on the Nominal platform under all connections.

1connection = nm.create_streaming_connection(
2 datasource_id="my-unique-datasource-id",
3 connection_name="my-connection-name"
4)

You can access this connection later using its rid. You can find the rid in connection.rid of the code above or look it up on the Nominal platform in the “Connections” section and use it in the nm.get_connection() function

Create a Run and Stream Test Data

Here, we’ll create a temporary run, associate it with our connection, and then stream some initial data.

1from datetime import datetime, timedelta
2
3values = [1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] * 3
4interval = timedelta(seconds=2)
5start = datetime.now() - len(values) * interval
6timestamps = [start + i * interval for i in range(len(values))]
7
8temp_run = nm.create_run(
9 name = 'temporary_run',
10 start = start,
11 end = start + timedelta(hours=1),
12)
13temp_run.add_connection("my_ref", connection)
14
15with connection.get_nominal_write_stream(batch_size=len(values)) as stream:
16 stream.enqueue_batch(channel_name="my_channel_name", timestamps=timestamps, values=values)

Create an Asset

1asset = nm.create_asset(
2 name = 'streaming_checklist_asset'
3)
4asset.add_connection("my_ref", connection)

Create the Checklist in the Nominal Web UI

Checklist preview

  1. Go to the Checklist Page.
  2. Click on New checklist.
  3. Select the ‘temporary_run’ you created above.
  4. Enter a name for the first check and a priority and click Add check
  5. Define a check:
    • In the field next to when, select my_channel_name.
    • in the next field select >.
  6. you should now see the violations in the chart. Zoom in if necessary.
  7. click on Publish in the top-right corner.
  8. From the dropdown next to the checklist name (), copy the rid of the checklist.
  9. Paste the rid in the code below.
1checklist_rid = "paste_rid_here"

Select an Integration for Notifications

A streaming checklist needs an integration to send alerts to. You can find available integrations in the Integrations Page copy the rid and paste it in the code below.

1integration_rid = "paste_rid_here"

integrations

Execute the Streaming Checklist

With the checklist and asset ready, and an integration selected, you can now execute the streaming checklist:

1checklist = nm.get_checklist(checklist_rid)
2
3checklist.execute_streaming(
4 assets=[asset],
5 integration_rids=[integration_rid],
6)

Monitoring and Managing Streaming Checklists

  • List running streaming checklists:
1list(nm.list_streaming_checklists())
  • Stop a running streaming checklist:
1checklist.stop_streaming()

Stream More Data to Trigger Notifications

Now that your streaming checklist is active, send more data to trigger notifications. The following code continuously sends data points every 2 seconds. When conditions are met, notifications will be sent to the configured integration.

1from datetime import datetime, timedelta
2from time import sleep
3
4from datetime import datetime, timedelta
5from time import sleep
6
7values = [1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
8interval = timedelta(seconds=2)
9
10with connection.get_nominal_write_stream(batch_size=1) as stream:
11 for value in values * 100:
12 now = datetime.now()
13 print(f"Sending data point at {now}: {value}")
14 stream.enqueue(channel_name="my_channel_name", timestamp=now, value=value)
15 sleep(2)

Example Alert (Slack): You should now receive real-time notifications through your chosen integration. For Slack, the messages might look like this:

slack integration

Built with