Satori Docs

Your definitive guide to subscribing and publishing live data

Streambot™ API for Java Direct link

This article provides information on the methods that you use to implement Streambots™ in Java.

The Satori Java Streambot™ SDK provides a Bot interface you need to implement to create a Streambot™. You need to implement the onSetup and onSubscriptionData methods.

The following methods are described in this article.

To initialize a Streambot™:

  • onSetup
  • getCustomConfiguration

To receive messages:

  • onSubscriptionData

To publish messages:

  • getRtmProxy

Initialize a Streambot™ Direct link

onSetup Method

To initialize a Streambot™, implement the onSetup method as shown in the code sample below.

/**
 * Streambot™ custom logic
 */
public class MyBot implements Bot {
 
@Override
  public void onSetup(BotContext botContext) {

    final JsonObject object = botContext.getCustomConfiguration().getAsJsonObject();
    
    // process Streabot™ configuration
    ...
}

Your onSetup implementation is automatically invoked and performs initialization operations when the Streambot™ starts. 

getCustomConfiguration Method

As shown in the example, you can call the BotContext parameter’s getCustomConfiguration method to access the Streambot™ custom configuration parameters.

The custom configuration can be any data, represented as a JSON object, that you would like to have available to a Streambot™ instance when it starts. For a simple dispatcher Streambot™ that republishes messages from one channel to another, the configuration might consist of only the name of the channel to which it will republish. For example, the following line specifies example.out as the channel to which the Streambot™ will republish:

{"outputChannel":"example.out"}

The above channel name can be retrieved as shown in the following code, where the outputChannel key/value pair is accessed in onSetup by specifying the CONFIG_KEY constant.

public class MyBot implements Bot {
  private static final Logger logger = BotContext.getLogger(MyBot.class);

  private String outputChannel;
  private static final String CONFIG_KEY = "outputChannel";
  private static final String DEFAULT_OUTPUT_CHANNEL = "output";

@Override
 public void onSetup(BotContext botContext) {
   final JsonObject object = botContext.getCustomConfiguration().getAsJsonObject();
   outputChannel = object.get(CONFIG_KEY) == null ? DEFAULT_OUTPUT_CHANNEL : object.get(CONFIG_KEY).getAsString();
   logger.info("Setup done for example Streambot™");
 }

Receive and Process Messages Direct link

The primary function of a Streambot™ is to subscribe to a channel and process the messages it receives from that channel. The Streambot™ could then publish the messages to any channel in its project.

After initialization is complete, the Streambot™ can receive and process messages as follows.

onSubscriptionData Method

Your Bot class must implement the onSubscriptionData method which will be automatically invoked whenever a message from the RTM channel is received. This method processes messages received by the Streambot™ from the subscribed channels and may include a logic to republish portions of those messages to the channels.

/**
 * Streambot™ custom logic
 */
public class MyBot implements Bot {
.
. 
@Override
  public void onSubscriptionData(BotContext ctx, RtmSubscriptionData messages) {
    for (JsonElement msg : messages.getMessages()) {
           // process one message
        }
  }
}

RTM sends messages to a subscribed channel in batches represented as arrays of messages. Each batch of messages is in a Protocol Data Unit (PDU) called a subscription data PDU.

The RtmSubscriptionData parameter passed to your onSubscriptionData implementation contains the messages the Streambot™ receives. Your onSubscriptionData method iterates through those messages using the RtmSubscriptionData.getMessages method and execute custom logic on them.

public void onSubscriptionData(BotContext ctx, RtmSubscriptionData messages) {
    for (JsonElement msg : messages.getMessages()) {
        // do something with msg
        }
}

Publish Messages Direct link

getRtmProxy Method

To publish messages, use the BotContext.getRtmProxy method, which is the Streambot™ live connection to the RTM. You can optionally request acknowledgement from RTM indicating whether the operation succeeded by specifying Ack.YES or Ack.NO as a parameter to the publish method. Ack.NO is the recommended option as it allows quicker publishing.

In the following example, a dispatcher Streambot™ simply republishes data it receives to the channel.

public void onSubscriptionData(BotContext ctx, RtmSubscriptionData messages) {
   for (JsonElement msg : messages.getMessages()) {
     try {
       ctx.getRtmProxy().publish(outputChannel, msg, Ack.NO);
     } catch (InterruptedException e) {
       logger.error("Execution interrupted", e);
       Thread.currentThread().interrupt();
     } catch (RtmException e) {
       logger.error("RTM problem: ", e);
     }
   }
}