Satori Docs

Your definitive guide to subscribing and publishing live data

Overview Direct link

A streambot allows you to define and execute custom logic on each message in a real-time message stream. A streambot works by subscribing to a real-time messaging (RTM) channel and executing your custom code for every message received in that channel. In addition, a streambot can also publish content to any RTM stream in its project. You can also set filters on a message stream so that the streambot receives only a desired subset of messages.

You can develop your own streambots entirely on the Satori platform: see the Streambots Quickstart to get started.

For example, you can intercept messages from a stream, analyze their contents looking for certain keywords, and send messages back to the RTM stream.

Note: During the beta phrase, streambots only communicate with RTM and cannot call out to external URLs; streambots cannot write to files or access the file system, spawn processes, open sockets, or any perform any I/O beyond communicating with RTM via publishing and subscribing to channels.

The Satori Streambot SDK also provides a local streambot executor which allows you to run, debug, and test your streambot on your local machine (running locally) using either the command line or a Java IDE (e.g. IntelliJ).

To help you get started quickly with your development, Satori provides a starter streambot called the satori-bot-example, which is the basis for this Streambot Tutorial, and can be used as a starting point for creating your own streambot. This example includes everything you need to get started, including the SDK dependencies.

Streambot Instances Direct link

A streambot is instantiated based on your implementation of the Bot interface, which you will compile and package into a JAR file using the gradle botJar task (described in Deploy). Once you have created a Project on the Dev Portal, you upload that JAR file, which is then used to instantiate as many instances of your streambot as you like; this way you can simultaneously process different RTM streams. Each streambot instance is a running streambot in the Satori environment; each running streambot subscribes to channels and processes the messages it receives.

The SDK already includes a few dependencies so they need not be included in your streambot JAR file.

Using the Dev Portal, you create streambot instances and then deploy, start, and stop them. You can also view the log for each instance.

Streambot Instance Configuration

Each streambot instance must be configured to specify the channels to which it will subscribe. Each channel may have an optional specification in which you define a custom query that filters the messages (see Views). Additionally, for your convenience, you can create custom configuration JSON parameters that are passed to the streambot instance when it is launched; your custom code can take advantage of these parameters.

There are two ways you can run a streambot:

  • You can run a streambot from the Dev Portal. In this case, you must specify the configuration properties for the streambot instance in the Dev Portal UI when instantiating the streambot.
  • You can run a streambot locally. In this case you must manually provide a JSON configuration file that specifies the configuration properties.

The satori-bot-example includes the configuration file (config-example.json), can be used as a starting point for your development. 

For your private data, each streambot instance must use the appkey and endpoint (host) from the project on which you deployed the streambot on the Dev Portal. For Open Data, your streambot must use the open data channel endpoint and the Open Data Appkey.

Streambot Versions Direct link

A streambot version is a specific version of the JAR file you compiled. Streambot versions are defined in the Dev Portal, where you can deploy multiple versions.

The number of versions that your organization can create depends on your account type.

The Streambots page of the Dev Portal includes a list of all streambots, versions and instances for your organization:

Using the Streambot API Direct link

The Satori Java Streambot SDK provides a Bot interface, which you must implement in order to create your own streambot. You must provide implementations of the onSetup() and onSubscriptionData() methods:

Implementation of Bot Interface

/**
 * Bot custom logic
 */
public class MyBot implements Bot {
 
@Override
  public void onSetup(BotContext botContext) {

    final JsonObject object = botContext.getCustomConfiguration().getAsJsonObject();
    // process bot configuration
    …
    final String outputChannel = object.get(CONFIG_KEY).getAsString();
  }

  @Override
  public void onSubscriptionData(BotContext ctx, RtmSubscriptionData messages) {
    for (JsonElement msg : messages.getMessages()) {
           // process one message
        }
  }

}

Initializing a Streambot

Your onSetup() implementation is automatically invoked and performs initialization operations when the streambot starts. As shown in the example, you can call the BotContext parameter’s getCustomConfiguration() method to access the streambot’s custom configuration parameters.

The custom configuration can be any data, represented as a JSON object, that you would like available to a streambot instance when it starts. For a simple dispatcher streambot that republishes messages from one channel to another, the configuration might consist of only the name of the channel to which it will republish. For example, the following line specifies example.out as the channel to which the streambot will republish:

{"outputChannel":"example.out"}

This channel name can be retrieved as shown in the following code, where the outputChannel key/value pair is accessed in onSetup() by specifying the CONFIG_KEY constant:

@Override
  public void onSetup(BotContext botContext) {

    final JsonObject object = botContext.getCustomConfiguration().getAsJsonObject();
    // process bot configuration
    …
    final String outputChannel = object.get(CONFIG_KEY).getAsString();
  }

Processing Messages

Your onSubscriptionData() implementation is automatically invoked whenever a message from the RTM stream is received. This method processes messages received by the streambot from subscribed channels, and may include logic to republish portions of those messages to the channels. These two operations are described in the following subsections.


Handling Received Messages

The primary function of a streambot is to subscribe to a channel and process the messages it receives from that channel. The streambot could then publish messages to any channel in its project.

When RTM sends messages to a subscribed channel, it does so in batches represented as arrays of messages. It sends each batch of messages in a Protocol Data Unit (PDU) called a subscription data PDU.

The RtmSubscriptionData parameter passed to your onSubscriptionData() implementation contains the messages the streambot receives. Your onSubscriptionData() method can iterate through those messages using the RtmSubscriptionData.getMessages() method and perform custom logic on them:

public void onSubscriptionData(BotContext ctx, RtmSubscriptionData messages) {
    for (JsonElement msg : messages.getMessages()) {
        // do something with msg
        }
}
Publishing Messages

To publish messages, use the BotContext.getRtmProxy() method, which is the streambot’s live connection to the RTM. You can optionally receive acknowledgement from RTM indicating whether the operation succeeded by specifying Ack.YES or Ack.NO as a parameter to the getRtmProxy() method.

In this example, a dispatcher streambot simply republishes data it receives to the channel:

public void onSubscriptionData(BotContext ctx, RtmSubscriptionData messages) {
    for (JsonElement msg : messages.getMessages()) {
      try {
        ctx.getRtmProxy().publish(outputChannel, msg, Ack.NO);
      } catch (InterruptedException e) {
        logger.error("Execution interrupted", e);
        Thread.currentThread().interrupt();
      } catch (RtmException e) {
        logger.error("RTM problem: ", e);
      }
    }
}

Additional SDK Features Direct link

CLI Tool

The Satori CLI provides you with a convenient command-line interface (CLI) to publish and subscribe to streambot channels when running your streambot locally, and also lets you record and replay channel message streams.

Run the following command to use pip to install the Satori CLI:


pip install satori-rtm-cli

You must have Python installed in order to run the pip command.

See the Quickstart for an example of how to use the Satori CLI to test your deployed streambot instances.

Logging

The SDK provides a logging framework to generate log events in your streambot. When you log an event, the logger posts the logged message to the streambots logs page available through the Dev Portal, in the Debug tab of a streambot instance.

Use the BotContext.getLogger() method to retrieve an instance of the logger. The following code sample shows a log event in satori-bot-example:

import org.slf4j.Logger;
...
private static final Logger logger = BotContext.getLogger(MyBot.class);
...
logger.info("Set up done for example bot");

You can also view the log events on the Debug tab of the Dev Portal for the streambot instance:

You can enable history on the satori.bot.logs channel to be able to see past logs using the Satori CLI.