UDP¶
Overview¶
The protocol described here is build on top of the UDP network protocol.
Reasons for using
Fast transmission between geographical locations, e.g. using microwave.
Open protocol and FlatBuffers makes it possible to consume data streams from any language.
All gateways optionally support this feature.
The roq-udp-subscriber implements a read-only gateway.
Format¶
A header occupies the first 16 bytes.
An optional payload may follow.
Maximum length of a payload is 512 bytes.
A payload is a fragment of a larger message when the message length exceeds 512 bytes.
The header contains fields to manage and detect
Reset (session id)
Re-assembly (sequence number, current fragment number, max fragment number)
Re-ordering within a single channel (sequence number)
Recovery within a single channle (sequence number, last sequence number)
Correlation between two channels (sequence number, last sequence number, object type, object id and snapshot)
Offset |
Width |
Type |
Description |
---|---|---|---|
0 |
1 |
|
Control (bit fields)
|
1 |
1 |
|
Current fragment number (0-based) |
2 |
1 |
|
Max fragment number (0-based) |
3 |
1 |
|
object type (distinct for session, opaque value) |
4 |
2 |
|
Object ID (distinct for session, opaque value) |
6 |
2 |
|
Session ID (random number, e.g. seconds since epoch % (2^16-1)) |
8 |
4 |
|
Sequence number (will wrap around to zero after reaching (2^32-1)) |
12 |
4 |
|
Last published sequence number of the encoded object (identified by object type + object id) |
16 |
[0;512] |
Encoded payload (may be empty, e.g. heartbeat) |
Note
The maximum length of the encoded payload is 512 bytes. A message larger than 512 bytes will be split into a number of fragements such that the payload of each fragment has the length 512 except for the last which will have a length less than or equal to 512. The total number of fragments can not exceed 256.
The choice of 512 bytes is losely based on this Stack Overflow discussion.
Features¶
Session ID¶
The producer will upon start-up generate a random number identifying the session. The choice of random number is implementation specific and may include a simple solution like number of seconds since epoch % (2^16-1).
The consumer must monitor this value and reset internal state if it changes. In particular, object type’s and ID’s are not guaranteed to be consistent between sessions.
Channels¶
There are two channels
The snapshot channel is used to regularly publish the latest object state.
The incremental channel is used to publish all object updates in real-time.
Sequence Number¶
The sequence number is an incrementing unsigned 32 bit integer which is allowed to wrap back to zero after it reaches its maximum value (2^32-1).
The sequence number will repeat if there are more fragments: all fragments belonging to the same message will share the same sequence number.
Each channel will have its own sequence number.
Last Sequence Number¶
This sequence number should be correlated with the sequence number received from the incremental channel.
This value makes it possible to determine when the object was last updated.
This is useful when correlating snapshot and incremental updates, e.g. during the initialization of an order book.
Another use case is when a dropped UDP datagram has been detected. It is then possible to mark all managed objects to be in an unknown state and initiate initialization. The object can recover if an incremental update is received and the last sequence number can be matched. This may be a much quicker recovery procedure than waiting for updates from the snapshot channel.
Fragments¶
There are two fields to manage fragments: current and maximum fragment number.
Fragments have a length of 512 bytes and there can be a maximum of 256 fragments. It is therefore possible to set up pre-allocated buffers of size 128 KiB (=256*512=131072 bytes). The index into the buffer is current fragment number * 512. The total length of the message is known when current fragment number equals maximum fragment number: the total length is then current fragment number * 512 + len(payload).
Encoding¶
The encoding of the message is communicated through a bit field.
Snapshot¶
Updates from the incremental channel can be identified as snapshot or incremental based on a bit flag. This makes it possible to accumulate buffers in memory without decoding while waiting for the snapshot channel as could be an initialization procedure for order books. This accumulation can be discarded if a snapshot arrives on the incremental channel.
GatewayStatus¶
A subscriber should not communicate StreamStatus
from the origin.
Rather, it should use masked version of the already aggregated GatewayStatus::supported
.
Note
It is important to mask the bit-mask of supported object types because the origin gateway may support object types that are not being broadcast on UDP.
An overlay to this is to use inactivity to communicate ConnectionStatus::DISCONNECTED
when no messages
has arrived during a configurable period and then revert to ConnectionStatus::READY
when any message arrive.
Note
This will work during inactive market hours due to the underlying transport supporting heartbeat.
Implementation¶
The following outlines the possible implementation of a subscriber.
State¶
State could be managed per pair of object type and object id.
Could be something like this
struct State final {
bool ready;
std::optional<uint32_t> last_seqno;
};
// {object_type, object_id} --> State
absl::node_hash_map<std::pair<uint8_t, uint16_t>, State> state;
Incremental¶
The updates can be processed directly and can use the state object to communicate with the snapshot channel.
This is the logic required to manage the updates
if (state.ready) { // the object is already in the ready state
state.last_seqno = header.seqno; // keep track of last_seqno -- we need it if there are drops
} else { // we are waiting for something...
if (header.snapshot) { // a snapshot will make the object ready
state.ready = true;
state.last_seqno = header.seqno;
} else {
if (!state.last_seqno.has_value()) { // collect updates, wait for snapshot after this seqno
state.last_seqno = header.seqno; // note! this is the *first* seqno while we collect
}
}
}
Snapshot¶
A snapshot is only decoded if it’s truly needed.
Something like this
auto include = [&header, &state]() {
if (header.object_type == 0x0) // special value, always include
return true;
if (state.ready) { // no need, the object is already in the ready state
return false;
} else {
if (state.last_seqno.has_value()) { // we are collecting and waiting for a snapshot
if ((*state.last_seqno) <= header.last_seqno) { // note! last_seqno is from the incremental channel
return true;
} else {
return false;
}
} else { // not ready and not collecting... dispatch the snapshot
return true;
}
}
}();
if (include) {
state.last_seqno = header.last_seqno; // note! last_seqno is from incremental channel
// ... only decode the message if it's truly needed
}