Satori Docs

Your definitive guide to subscribing and publishing live data

Streamview and Stream SQL Direct link

Streamviews are also known as filters.

A streamview is a type of subscription that filters, transforms, or aggregates messages after RTM gets them from the channel but before it sends them to your client. You specify the actions to take on each message using stream SQL, a Satori-specific SQL dialect that supports a subset of the SQL:2003 features. RTM applies the stream SQL to messages published to a channel (channel messages) and sends the results to your client (returned messages).

You typically use a streamview to filter messages you receive from RTM, using selection criteria or aggregation statistics such as a count of messages. A streamview can also merge messages received in a specified time period, or have RTM perform other transformations using stream SQL functions.

In Satori documentation, SQL refers to stream SQL unless otherwise noted.

Types of subscriptions

Clients that subscribe to a channel without specifying a streamview receive all the messages published to the channel. In other words, RTM sends all the channel messages as returned messages.

Alternatively, clients can subscribe and specify a streamview by specifying a filter, which is a stream SQL expression. If a client uses the RTM API, it specifies the stream SQL in the filter field of the subscribe PDU. If a client uses one of the RTM SDKs, the SDK provides a mechanism for setting the stream SQL.

Your streamview only affects only your subscription, even if your stream SQL is the same as that specified by another client. In your client, you can have multiple streamviews for the same channel or for different channels. 

Streamviews operate on structured messages. Messages in other formats are dropped when RTM applies the streamview.

Types of streamviews

You can create two types of streamviews: 

  • Simple: RTM applies simple streamviews to each channel message. For a simple streamview, RTM applies the stream SQL to each individual message and immediately sends the result to your client when the WHERE expression evaluates to true. The following is an example of the stream SQL for a simple streamview: 
    SELECT * FROM `my-channel` WHERE `my-field` > 0
  • Aggregate: RTM partitions channel messages by time period, applies the stream SQL, and returns an aggregated result message for each partition. The following is an example of the stream SQL for an aggregate streamview: 
    SELECT * FROM channel GROUP BY a
    See Aggregate Streamview to learn more about aggregate streamviews.

Streamviews and channel history

When you first subscribe using a streamview, RTM applies the stream SQL to all previous messages. You can use this to accumulate the state that's present in the channel at the time you subscribe. For example, you can use stream SQL to aggregate the count of important messages that arrived before you subscribed.

Database SQL and Stream SQL

The following table maps database SQL concepts to stream SQL concepts.

DatabaseRTM
tablechannel
columnfield in a message
rowmessage
ROWIDmessage position (ROWID is available as a pseudo field. See Stream SQL.
column typen/a.
RTM has no control over the data types of published messages.

Stream SQL Direct link

Stream SQL Syntax

This is the overall syntax of stream SQL:

SELECT select_expr [AS alias][, select_expr [AS alias]]
   FROM (channel_name | subquery)
      [WHERE where_expression]
      [GROUP BY group_by_expr[, group_by_expr]]
      [HAVING where_condition]
  • select_expr: A list of field names, functions, a wildcard, and other accepted SELECT expressions. Stream SQL supports nested field names.
  • channel_name: A single channel name 
  • where_expression: A logical expression that RTM evaluates for each channel message. If the expression evaluates to true, RTM sends the message to you. If you don't specify a WHERE clause, RTM selects every message. You can use this to apply a transformation to each message.
  • group_by_expr: One or more field names, aliases, or field positions

Escaping

Stream SQL supports UTF-8. You must escape channel and field names containing characters other than [a-zA-Z0-9_] with backticks (`), e.g. `my-channel` or `my.field`

Specify string literals in either single or double quotes. If the string delimiter character is used inside the string, it must be escaped using a backslash, for example "\"Hello.\"".

Select All Messages in a Channel

If you don't specify a WHERE clause, RTM sends you every JSON or CBOR message.

SELECT * FROM `my-channel`

Filter Messages on a Single Value

To receive only the messages matching a certain condition, use the WHERE clause. RTM only sends you the messages for which the WHERE expression evaluates to true.

SELECT * FROM `my-channel` WHERE `my-field` > 0

Filter Messages on Multiple Values

Use the IN operator for the WHERE clause to specify multiple values for a specific field. 

The following example selects all messages where the value of field a is 'US', 'Spain', or 'Brazil':

SELECT * from `my-channel` WHERE a IN ('US', 'Spain', 'Brazil')

Filter Messages with Pattern Matching

Use the LIKE operator for the WHERE clause to match string field values to a pattern. The LIKE clause can contain the following wildcard characters:

  • '%': Any number of characters match.

  • '_': Any single character matches at the given position within a field's value.

The following example selects all messages where the values of field a contain 't' in the second position, regardless of the length of the value.

SELECT * FROM `my-channel` WHERE a LIKE '_t%'

In the preceding example, '_t%' has this effect:

'_': Matches in the first position

't': Matches the character t in the second position

'%': Matches any number and type of characters in the remaining part of the string.

Filter Messages in Range

Use the BETWEEN operator in the WHERE clause to filter on numeric, text, or date values within a specified range. The starting and ending part of the range also match the value. 

The following example selects all messages where the value of field a is within 1 and 5, inclusive.

SELECT * FROM `my-channel` WHERE a BETWEEN 1 AND 5

Nested Select

When you use a nested select in the FROM clause, the inner select acts as a temporary table for the outer select.

SELECT * FROM (SELECT * FROM `my-channel`)

Select a Nested Field

Because a message is a JSON or CBOR object, it can contain nested objects. In stream SQL, you can select nested fields from these objects using dot notation. 

For example, suppose the following message appears in the channel:

{
  "item" : 
    { 
      "name" : "sweater",
      "available" : 5}
    }
}

The following queries demonstrate the use of nested fields:

Stream SQLMessageComment
SELECT item.name FROM `my-channel`{"item.name" : "sweater"}Selects the subfield name of item.
SELECT item.name AS itemname FROM `my-channel`{"itemname" : "sweater"}The default name of the output field in stream SQL is the JSON key name. You can change it to an alias using the AS keyword.
SELECT item FROM `my-channel`
{
  "item":
    {
       "name": "sweater", 
       "available": 5
    }
}
Selects all subfields of item.

Create a Nested Field

You can generate nested messages by using dot notation in an alias.

For example, suppose the following message appears in the channel:

{
  "item" : 
    { 
      "name" : "sweater",
      "available" : 5}
    }
}


Stream SQLMessageComment
SELECT item.name AS item.name FROM `my-channel`
{
  "item" :
    {
      "name" : "sweater"
    }
}
Generate a subfield name under item.
SELECT * AS item FROM `my-channel`
{
  "item" : 
    {
      "item" : 
        {
          "name" : "sweater",
          "available" : 5
        }
    }
}
Generate subfields under item using a wildcard.

Array Indexing

Use the bracket notation [ ] to extract information from an array. Indexing is 1-based.

Stream SQL accepts any expression as the index value.

SELECT `my-array`[index] FROM `my-channel`

SELECT * FROM `my-channel` WHERE `my-array`[0] > 1

CASE Statement

Use the CASE operator in the SELECT clause to execute an if-then-else condition on a per-message basis and return custom values in a new field.

The example below evaluates field1 in each message. If the value of field1 is "2001" for a given message, the value of item is set to "yes"; otherwise item is set to NULL.  The returned message contains the value of field1 assigned to a new field named item.

SELECT field1
    CASE WHEN field1 = `2001` THEN `yes` 
             ELSE NULL END AS item FROM `my-channel`

Pseudo Fields

When clients publish messages to a channel, RTM adds a position and a timestamp to the message:

  • ROWID: A sequence number corresponding to the position of the message
  • TIMESTAMP: The timestamp of the message (in microseconds)

To have these fields to appear in returned messages, you must specify them in your stream SQL.

Pseudo fields are available in queries that select from a specific channel with the FROM channel clause. 

You can't specify pseudo fields when selecting from a subquery with the FROM subquery clause.

SELECT ROWID, TIMESTAMP FROM `my-channel`

Compare Different Data Types

An RTM client can publish messages where each message uses a different type of data for the same field. To facilitate tests and comparison for these messages, Stream SQL does the following:

  • Automatically converts boolean data to integers according to the following rules:
    • true is auto-converted to 1.
    • false is auto-converted to 0.
  • Evaluates comparisons between data types in this order:
null < number < string < byte string < array < object

For example:

  • 7 > null
  • "7" > 7
  • (byte string) 0x07 > "7"
  • [ 1, 3, 4] > b'00001111'
  • { "item" : "sweater", "count": 5} > [1 , 3, 4]

With the following query, all of the input messages below will be selected because the the values of the a field, a number 7, a string "1", and the JSON object {"b":[0]}}, are all evaluated to be bigger than the number 3 in the WHERE clause.

SELECT * FROM `my-channel` WHERE 3 < item

Inputs:

  • {"item":7}
  • {"item":"1"}
  • {"item":{"count":[0]}}

Stream SQL does not auto-convert other data types.

Aggregate Streamview Direct link

Aggregate streamviews produce messages containing collected data for channel messages. For example, the aggregate function MAX(field) returns a message that contains the maximum value of field. Stream SQL always partitions the collected data into time periods and sends you a message at the end of each time period.

For example, suppose you subscribe with a period of 2 seconds and a stream SQL statement like this:

SELECT item, MAX(count) from `my-channel`

Every 2 seconds, RTM sends you a message containing the first value of item in the period (see Implicit FIRST_VALUE) and the maximum value of count for that 2 seconds.

The following streamview specifications automatically create an aggregate streamview:

  • period field

  • Stream SQL that contains aggregate function such as MAX

  • Stream SQL that contains a window function such as FIRST_VALUE

Partitions

  • By default, messages in a partition are ordered by ROWID.
  • You can't change the partition inside your query, and the PARTITION BY clause is not supported in stream SQL. 
  • You can specify the length of a partition time period in seconds by specifying the period field in your subscription request. The default is 1 second, and the maximum is 60 seconds. 

Implicit FIRST_VALUE

For each partition, stream SQL returns exactly one message containing the aggregated results.

By default, if you specify a non-aggregated field in an aggregate stream SQL statement, RTM returns the FIRST_VALUE for the field.

For example:

SELECT item, MAX(count) FROM `my-channel`

is the same as

SELECT FIRST_VALUE(item), MAX(count) FROM `my-channel`

To avoid ambiguity, explicitly specify the value to select for each non-aggregated field in an aggregate streamview, using FIRST_VALUE or LAST_VALUE.

ORDER BY for Window Functions

In SQL, window functions are a type of aggregate function. They control how messages in a group are partitioned, or ordered, or both

When you use a window function in a query, you can change the order in which RTM handles returned messages on a per field basis using the OVER (ORDER BY) clause. For example:

SELECT FIRST_VALUE (color) OVER (ORDER BY item FROM `my-channel`

The preceding query first orders the returned messages by item and then picks the value of color from the first message.

See Stream SQL Window Functions for a list of window functions in Stream SQL.

GROUP BY to Partition the Output

Use GROUP BY to partition your returned messages. Each group created within a partition results in a single message. Stream SQL uses an implicit FIRST_VALUE for each group.

This groups the returned messages, not the channel messages. RTM only partitions channel messages by time period.

In the following example, your returned messages are first grouped by the item field, and then RTM sends you the first message of each group:

stream SQL:
SELECT size, count FROM channel GROUP BY item_number

returned messages, before grouping:

{"item_number" : 1, "size" : "first"}
{"item_number" : 1, "size" : "largest", "count" : 19, "cost" : "1"}
{"item_number" : 2, "size" : "first",  "count" : 13}
{"item_number" : 1, "size" : "smallest", "count" : 11}
{"item_number" : 2, "size" : "smallest", "count : 11, "cost" : 0, "tax" : 5}

messages sent to you:

{"item_number" : 1, "size" : "first", "count" : null}
{"item_number" : 2, "size" : "first", "count" :13}

Use the HAVING clause with GROUP BY to retrieve a subset of the groups created during the partition. RTM only sends you an aggregated message if the condition evaluates to true for that group.

If the stream SQL query results in too many groups, RTM drops all messages for the aggregation period and sends an info message to your client with the info field set to filter_too_many_groups.

SELECT * FROM `my-channel` GROUP BY a HAVING b > 0

Stream SQL Functions Direct link

The following table describes the general-purpose functions that stream SQL supports.

Function (arguments)
Description
CONCAT([field1,[,field2,...])

Returns a string created by concatenating the values of all the specified fields. All non-string values are treated as the empty string ("").
UPPER(field)
- Returns the value of field with all characters
  converted to uppercase.
- For non-string values, UPPER() returns the value.
- If the value is null, UPPER() returns null.
The conversion is done using the en_US.UTF locale.
LOWER(field)
- Returns the value of field with all characters
  converted to lowercase.
- For non-string values, LOWER() returns the value.
- If the value is null, LOWER() returns null.
The conversion is done using the en_US.UTF locale.
SUBSTR([field|string],position[,length])

- Returns a substring of the value of field, starting at
position and ending at length.
- If length is omitted, or length is greater than the length of the value, the entire value starting at position is returned.
- Position < 1 causes SUBSTR to start at the end of the value and move leftwards.
Examples:
- SUBSTR("abc", 1) returns "abc"
- SUBSTR("abc", 2, 1) returns "b"
- SUBSTR("abc", -2) returns "bc"
JSON(field)
Parses the string value in field and returns a JSON object (similar to the JavaScript JSON.parse() method). You can then use object references to refer to fields.

Example:
If the message contains {"cookie":"{\"gold\":20}"}, JSON(cookie) returns an object containing the field gold. The value of gold is 20.
You can use the following stream SQL:
SELECT JSON(cookie) as json_obj from `my_channel` where json_obj.gold = 20
to return messages that contain {"cookie":"{\"gold\":20}"}.
ABS([field | number])Returns the absolute value of field or number. If the parameter is non-numeric, returns 0.
DECODE(exp,tgt1,val1[,tgt2,val2...])
Evaluates exp, then searches for a match in the values of tgt1 through tgtn. If it finds a match, it returns the corresponding val:; Otherwise, it returns null.
Example:
If the value of item is 1 and the value of count is 3
- DECODE (item, 1, "one") returns "one"
- DECODE (count, 1, "one") returns null
- DECODE (count,1,"one",2,"two",3,"three") returns "three"
MD5(field)Returns the MD5 hash of the value of field.
Use MD5() to test data integrity of a message
MD5() hashes strings, but all other types are pass-through.
SHA1(field)Returns the SHA1 hash of the value of field
Use SHA1() to test the data integrity of a message.
SHA1() hashes strings, but all other types are pass-through.
CHECKSUM( * | (field1 [field2...]))Returns the CHECKSUM of the values of the arguments. 
Use the return value to verify data integrity. 
Unlike MD5() and SHA1(), CHECKSUM converts all types of data. For hashing string data, CHECKSUM has a higher degree of collisions than MD5 or SHA1. If you are checking strings, use MD5 or SHA1 to get better results.

Examples:
- CHECKSUM (bus_position, bus_capacity)
- CHECKSUM (*) 
COALESCE(field1 [,field2 ...]) Returns the first non-null value from the list of fields, or null if none exists.
Example: For the message
{ "item": "", "count": "", size:5}
COALESCE(item, count, size) returns "size"
IFNULL((exp | field | * ), value_if_null)
If the value of the first argument is null, return value_if_null. Otherwise, return the value of the first argument.


Stream SQL Aggregate Functions Direct link

The following table describes the aggregate functions in stream SQL.

Function (arguments)Description
COUNT(arg)Returns the count of the argument.
Examples:
- COUNT(*) returns the number of messages.
- COUNT(item) returns the number of non-null item fields in the
  messages.
- COUNT(item.name) returns the number of name fields in the messages.
MAX(field)Returns the largest value of field among all messages in the partition. Defaults to null.
MIN(field)Returns the smallest value of field among all messages in the partition. Defaults to null.
SUM(field)Returns the sum of the field's values for the partition.
Default is zero.
AVG(field)Returns the filtered average of all numeric values of field for all the messages in the partition. AVG() skips messages for which field is non-numeric or null. The default is zero.

Stream SQL Window Functions Direct link

The following window functions are supported in Stream SQL.

Function
(arguments)
Description
FIRST_VALUE(field)
[ OVER ( ORDER BY expr [ ASC | DESC ])]
Returns the first value of field after sorting messages according to the ORDER BY expression.
Default order is by ROWID.
LAST_VALUE( field )
[ OVER ( ORDER BY expr [ ASC | DESC ])]
Returns the last value of field after sorting messages according to the ORDER BY expression.
Default order is by ROWID.
MERGE_FIRST( * | field )
[ OVER ( ORDER BY expr [ ASC | DESC ])]
MERGE_FIRST(*) merges returned messages and sends you a message containing the first non-null value of each field in the merged result.
MERGE_FIRST(field) merges returned messages that contain field and sends you a message containing the first non-null value of each field in the merged result.
MERGE_LAST ( * | arg
[ OVER ( ORDER BY expr  [ ASC | DESC ])]
MERGE_LAST(*) merges returned messages and sends you a message containing the last non-null value of each field in the merged result.
MERGE_LAST(field) merges returned messages that contain field and sends you a message containing the last non-null value of each field in the merged result.


Examples

FunctionInputsOutput
FIRST_VALUE(*) OVER( ORDER BY item ){"item":"hat","count":4}             
{"item":"hat:,"count":1}      
{"item":"coat","count":2}   
{"item":"coat","count":3}
{"item":"coat","count":2}
LAST_VALUE(*){"item":null,"count":4}    
{"item":2,"count":1}
{"item":"hat","count":2}
{}
{"item":1,"count":3}
{"item":"hat","count":3}
LAST_VALUE (count){"count":null}
{"count":true}
{}
{"count":12}
{}
{"count":12}
MERGE_FIRST (items){"items":{"hat":11}}
{"items":{"coats":{"XL":1}}}
{"items":{"hat":11,"coats":{"XL":1}}}
MERGE_FIRST(*) OVER(ORDER BY size){"count":1,"size":1}
{"count":2,"size":2}
{"count":3}
{}
{"count":4,"size":null}
{"count":3,"size":1}
MERGE_LAST(*) OVER (ORDER BY size){"count":20,"size":3}   
{"count":10,"size":1,"units":6}
{"count":3}           
{}               
{"count":2,"size":null}
{"type":20,"size":3,"units":6}
MERGE_LAST (type){}    
{"type":1}       
{"type":2}       
{"type":3}        
{"type":{"ord":1}}            
{"type":{"ord":2, "size":2}}   
{}
"type":{"ord":2, "size":2}}

Differences between stream SQL and SQL:2003 Direct link

The following table lists the syntax and behavior differences between stream SQL and SQL:2003. Use the table to identify where Stream SQL deviates from the SQL standard as you create streamviews using Stream SQL.

FeatureDescription
Window functionsStreamview and SQL:2003 differ in their implementation of these window functions: FIRST_VALUELAST_VALUEMERGE_FIRST, and MERGE_LAST
PARTITION BYNot supported in stream SQL.
Stream SQL only partitions by time period. Use the period field in your subscribe request to set the period.
ORDER BYStream SQL only allows the ORDER BY clause inside the OVER clause. 
The OVER clause is optional. The default ordering is OVER(ORDER BY ROWID).
Implicit FIRST_VALUEFor aggregate and window functions, stream SQL returns exactly one message for each partition, while SQL:2003 returns a list of all rows in a group. Stream SQL implicitly uses FIRST_VALUE to select the value to return.
Pseudo fieldsStream SQL provides two Satori-specific pseudo fields for each message: ROWID and TIMESTAMP
Nested SELECT statementsStreamviews allow nested SELECT statements inside the FROM clause. Use a nested SELECT to specify the messages to use as inputs for the outer SELECT.
For example, use a nested SELECT to calculate an aggregate of aggregates, such as an average of sums.
JOINNot supported in Stream SQL.
LIMIT
TOP
SORT
Not supported in Stream SQL.