Satori Docs

Your definitive guide to subscribing and publishing live data

Subscribing Direct link

Subscription is a more involved operation compared to publish because it is a persistent “link” between the client and the channel: a client subscribes to a channel to continually receive incoming messages published to that channel. The messages are pushed to the subscriber, to contrast it with pull based approaches where a client constantly polls the server to retrieve new data.

RTM supports a number of capabilities associated with subscriptions: historical messages from the past, stream continuity using position, flags to request specific behaviors in difficult edge-case situations, SQL-like evaluation on the stream of messages and more.  

To subscribe to a channel, a client sends a subscription request; RTM confirms the request, remembers the subscription associated with the client connection and starts sending the messages from the channel. In case of a failure (e.g. unauthorized attempt to subscribe), RTM replies with an error.  

With RTM SDKs you call either subscribe or createSubscription method, depending on the SDK language you are using. This converts to a Subscribe PDU on the wire.


Full source code for all RTM SDK examples is available on github:

JavaScript | Java | Python | C | C# | Go

RTM SDK code snippets assume the RTM client instance and connecting code as shown in the Setup section.

Subscribing and receiving messages Direct link

This section shows not only how to subscribe to a channel but also how to handle incoming messages, subscription related state changes and errors. Further examples build on it and show only the necessary lines to demonstrate a different aspect of subscribing.

The simplest (and typical) way to subscribe is for receiving all the messages as they arrive to a channel: a client specifies only the channel name. Once the subscription is established, RTM starts forwarding new messages as soon as they arrive to the channel.

The example below subscribes to “animals” channel.

var channel = client.subscribe('animals', RTM.SubscriptionMode.SIMPLE);

/* set callback for state transition */
channel.on('enter-subscribed', function () {
  console.log('Subscribed to: ' + channel.subscriptionId);
});

channel.on('leave-subscribed', function () {
  console.log('Unsubscribed from: ' + channel.subscriptionId);
});

/* set callback for PDU with specific action */
channel.on('rtm/subscription/data', function (pdu) {
  pdu.body.messages.forEach(function (msg) {
    console.log('Got animal ' + msg.who + ': ' + JSON.stringify(msg));
  });
});

channel.on('rtm/subscribe/error', function (pdu) {
  console.log('Failed to subscribe. RTM replied with the error ' +
      pdu.body.error + ': ' + pdu.body.reason);
});

channel.on('rtm/subscription/error', function (pdu) {
  console.log('Subscription failed. RTM sent the unsolicited error ' +
      pdu.body.error + ': ' + pdu.body.reason);
});

In case of a simple subscription (no view, no subscription id is specified), the channel name is the subscription id.

Unsubscribing Direct link

You can request to unsubscribe from a channel at any time (without disconnecting or losing other subscriptions) by specifying the subscription id you want to stop (if the subscription id was not used, it defaults to the channel name). Unsubscribe operation is the same regardless of how the subscription has been established (with or without view, or any other variations). Below example unsubscribes from the “animals” channel (subscription id).

client.unsubscribe('animals');

Changing subscription Direct link

After establishing a subscription, you may want to change some of its parameters: for instance, to update a view sql, period or any other subscription field. By default, RTM rejects a subscription request if there is already an active subscription with same subscription id (channel name when the id was not specified). You can change a subscription in one of the two ways:

  • Resubscribe by sending unsubscribe followed by a new subscribe request

  • Send a new subscribe request with a force flag, which directs RTM to replace a pre-existing subscription with the same subscription id (if any)

When using the first approach, i.e. resubscribing, you do not need to wait for a reply before sending a new subscribe: this will avoid a delay in establishing a new subscription; unsubscribe and subscribe responses from RTM will arrive in the order of the requests. 

The second approach is an optimization to avoid additional unsubscribe request: simply send a new subscribe request with the same subscription id and true for the force flag.

Wire protocol forcing a new subscription

// send a subscribe request
// "animals" becomes a subscription id
{
  "action": "rtm/subscribe", "id": "1",
  "body": { "channel": "animals"}
}

// send another subscribe for the same subscription id
// This replaces the original subscription receiving all channel messages
// with a viewed subscription receiving message count every 2 seconds 
{
  "action": "rtm/subscribe", "id": "2",
  "body": {
    "subscription_id": "animals",
    "filter": "select count(*) as count from animals", "period": 2,
    "force": true //if this flag is not set, “already_subscribed” error
  }
}

//received ok for the fist subscribe (subscription established)
{
  "action": "rtm/subscribe/ok", "id": "1",
  "body": {
    "position": "1490808173:258045738",
    "subscription_id": "animals"
  }
}

//received ok for the second forced subscribe
{
  "action": "rtm/subscribe/ok", "id": "2",
  "body": {
    "position": "1490808173:258045805",
    "subscription_id": "animals"
  }
}

// receiving messages for the updated subscription now
{
  "action": "rtm/subscription/data",
  "body": {
    "position": "1490808173:258072800",
    "subscription_id": "animals",
    "messages": [{ "count": 7}]
  }
}

... with a view Direct link

view (formerly filter) field in the subscribe request allows a subscriber to specify an arbitrary SQL expression (as supported by the Stream SQL), which RTM applies on the message stream before it sends the resulting data to the subscriber. The power of the views comes from the generalized SQL ability and that it is individual to each subscriber: you can subscribe with a different view at any time, fine-tuning the data you receive for the current need and only your subscription is affected. Use case and possibilities are many.

The example below subscribes with a view of only zebras from the “animals” channel. RTM will not forward any other messages, those not meeting the criteria in SQL. Subscription id is a client-generated value; it can be used to unsubscribe. We use “zebras” as a subscription id.

Note: the code snippet assumes the RTM client instance and and the subscription callbacks have been set up as shown earlier in “Subscribe to a channel” example.

var subscrtiptionId = 'zebras';
var channel = client.subscribe(subscrtiptionId, RTM.SubscriptionMode.SIMPLE, {
  filter: 'SELECT * FROM `animals` WHERE who = "zebra"'
});

... with multiple views Direct link

The example below shows how to establish several concurrent subscriptions to the same channel, where each subscription has its own view to receive different data.

Because there is multiple subscriptions to the same channel, the channel name cannot be used to identify subscriptions and RTM requires clients to specify subscription_id for viewed subscriptions. The example uses “zebras” and “stats” for the subscription ids. You can use these subscription ids to distinguish incoming messages, state changes and errors in the subscription callbacks if you share the same callbacks between the subscriptions.

In this example, the client subscribes with the following views:

Subscription id

View (Stream SQL)

Description

zebras

SELECT * FROM `animals` WHERE who = 'zebra'

Receive only zebras from the animals channel.

stats

SELECT count(*) as count, who FROM `animals` GROUP BY who

Receive message counts by animal kind. RTM view aggregates and delivers counts every second (default period).

Note: the code snippet assumes the RTM client instance and and the subscription callbacks have been set up as was shown earlier. Both subscriptions are established over the same RTM client.

function onSubscriptionData(pdu) {
  pdu.body.messages.forEach(function (msg) {
    if (pdu.body.subscription_id == "zebras") {
      console.log('Got a zebra:', JSON.stringify(msg));
    } else {
      console.log('Got a count:', JSON.stringify(msg));
    }
  });
}

var stats = client.subscribe('stats', RTM.SubscriptionMode.SIMPLE, {
  filter: 'SELECT count(*) as count, who FROM `animals` GROUP BY who',
});
stats.on('rtm/subscription/data', onSubscriptionData);

var zebras = client.subscribe('zebras', RTM.SubscriptionMode.SIMPLE, {
  filter: 'SELECT * FROM `animals` WHERE who = "zebra"',
});
zebras.on('rtm/subscription/data', onSubscriptionData);

... with position Direct link

Instead of starting a subscription from “now”, you can specify a channel position from which to start a subscription. Position is an RTM-generated value that identifies a specific location in the channel. See Channel Position in RTM API and use case explanation in Concepts later in this page.

Position can be applies both in a regular and viewed subscribe request. In case of a subscribe with the view field, the SQL expression from the view will be evaluated on the messages starting at the requested position.

RTM SDKs can automatically track and resubscribe with position (depending on the subscription mode chosen) and also expose a position parameter when subscribing.

Wire

{  
   "action":"rtm/subscribe",
   "id":"20",
   "body":{  
      "channel":"animals",
      "position":"1491076785:3"
   }
}

... with history Direct link

Using count

The example below shows how to subscribe to a channel starting with several messages from the past. See Channels History section in RTM API.  

In this example the client will receive the last 10 messages from the channel history immediately (if available) and then it continues receiving incoming messages as they arrive, as usual.

Note: The SDK code snippets assume the RTM client instance and and the subscription callbacks have been set up as was shown earlier.

var channelName = 'animals';
var channel = client.subscribe(channelName, RTM.SubscriptionMode.SIMPLE, {
  history: { count: 10 },
});

Using age.

The next example subscribes starting with messages published 60 seconds ago or less  and then it continues receiving incoming messages as they arrive, as usual.

var channelName = 'animals';
var channel = client.subscribe(channelName, RTM.SubscriptionMode.SIMPLE, {
  history: { age: 60 /* seconds */ },
});

... with fast-forward (advanced) Direct link

Fast-forward flag in the subscribe request affects how RTM handles a subscription in case of “out_of_sync” condition, which refers to a severe client “lag” relative the channel traffic. This is an advanced topic and is covered in the Concepts section and also in RTM API Subscribe. This section is for illustration by example.  

Wire example: subscribe request with fast-forward​

Below example shows a request to subscribe with fast-forwarding. With RTM SDKs, you control fast-forwarding flag via subscription mode passed a parameter when subscribing.

{  
   "action":"rtm/subscribe",
   "id":"20",
   "body":{  
      "channel":"animals",
      "fast_forward":true
   }
}

Wire example: fast-forwarded subscription info from RTM

If the subscription with fast-forwarding becomes “out_of_sync”, RTM informs of the fast-forward and keeps the subscription active.

{
  "action":"rtm/subscription/info",
  "body":{
    "subscription_id":"animals",
    "info":"fast_forward",
    "reason":"fast forwarded",
    "position":"1491076785:3",
    "missed_message_count":10
   }
}

Wire example: out_of_sync subscription error from RTM

If the subscription without fast-forwarding becomes “out_of_sync”, RTM pushes a subscription error and stops this subscription.

{
  "action":"rtm/subscription/error",
  "body":{
    "error":"out_of_sync",
    "reason":"out of sync error",
    "position":"1491076785:3",
    "subscription_id":"animals"
   }
}

Concepts Direct link

About subscription id

The subscription id is used to uniquely identify the subscription. See Subscribe PDU in RTM API for detailed description.

You can have multiple subscriptions to the same channel. The subscription ids should be different for each subscription and they can be used to identify the subscription to which a received data message belongs to.

About subscribing at position

Channel position is an RTM-generated value that identifies a specific location in a channel. See Channel Position in RTM API for more details and which RTM PDUs include the position field.

Your application can keep track of the position value included with the latest subscription data to avoid skipping messages published to the channel while the application was intermittently unsubscribed for whatever reason (such as due to a need to reconnect or other cases). After a disconnect or other failure, the application can re-subscribe to the channel specifying the saved position value. This allows the application to make sure it processes all the messages; in other words, the stream continuity is preserved for the subscription.

Another use case for tracking and subscribing with a position value is to maintain continual channel stream processing between two applications (clients). Consider a scenario where an application is receiving and processing data from the channel and some time later it needs to transfer the stream processing to another application, perhaps running on a different machine. You can pass the next position from the first application to the second one. The second application then picks up data processing by subscribing to the channel starting at the position where the first one left off. This process ensures an uninterrupted channel data stream between the two machines.

About subscribing with views

Subscribing with a view allows channel subscribers to shift the overhead of processing or filtering the messages to the RTM service. For a subscriber with a view, RTM evaluates specified SQL expression (specified in the view field of the subscribe request) on the messages and forwards the resulting data to the subscriber. 

As an example, consider a channel providing messages with vehicle locations in an entire country. Clients can request a viewed subscription on this channel to receive only a subset of vehicles in a specific state, city or even an exact map region as visible on user screen. Imagine the overhead of doing this on the client side instead, especially on mobile clients with metered network.

RTM allows multiple subscriptions to the same channel using different views. This opens up opportunities to receive channel data in different flavors over the same connection.

Another important capability of a viewed subscription is receive aggregated data partitioned over periods of time:  period field in the subscribe request. When period is specified, RTM runs the sql query on all the channel messages published for that period of time and forwards the result in a single message. See Views for more details

Subscriber lag, out-of-sync and fast-forward (advanced)

Fast-forward flag in the subscribe request affects how RTM handles a subscription in a rare but sometimes occurring in the real world scenario: when a subscription is not catching up processing incoming channel message rate, typically due to a slow client network. When RTM expires the message (when the message age or count exceeds the channel history) while there is an active subscriber which still has not processed it, RTM detects this condition and notifies such subscriber. RTM protocol refers to this condition as “out-of-sync”.

When out-of-sync is detected, RTM can either force unsubscription (after sending the error) so that the next steps are entirely up to the client. Or RTM can keep such subscription active but the subscription would have to be fast-forwarded to the channel position with non-expired messages. Fast-forward flag instructs RTM to do the latter (RTM notifies the subscriber and includes the count of messages skipped for this subscription).

Unsubscription due to out-of-sync is preferred to increase visibility and to have the application handling further steps.

Some of the measures to consider taking on unsubscription triggered by out_of_sync:    

  • Subscribe with a view constructed to reduce received channel traffic

  • Subscribe again after a pause and from the current position.

  • Stay unsubscribed from less important/optional channels to give all network capacity to more important channels.

  • Re-synchronize application state.

Note that this behavior is on a subscription basis. It is possible for a client to experience out_of_sync for one subscription, while having other concurrent subscriptions unaffected.

When using fast-forward flag, the application needs no dedicated handling for out-of-sync but be aware of your application behavior when receiving stale message and when they get skipped.

When a subscription is lagging, it means increasingly delayed messages in terms of when the message was originally published to the channel and when it is finally received by the lagging subscriber. Application design should consider integrating timestamps if it wants to detect lagging subscription early on.

Applications should at least log and track out-of-sync occurrences to correctly diagnose customer reported issues.