-
Notifications
You must be signed in to change notification settings - Fork 66
Subscriptions from DEALER sockets
When sending a subscription query to the Libbitcoin BS server, we must use a ZMQ DEALER socket, since the resulting message exchange will not strictly follow the request-reply pattern as in our introductory Bitcoin Server query example.
Instead, the subscription update notifications can come at any time. Therefore, we must use the ZMQ dealer socket to subscribe for and listen for subscription updates, since this ZMQ socket type allows asynchronous messaging patterns.
Shown below is a schematic of an asynchronous message exchange with the BC server. Unlike the ZMQ REQ socket, the DEALER socket does not automatically prepend a delimiter frame to each ZMQ query message. The BC server will accept requests with or without the delimiter frame, and respond in-kind.
Since the ZMQ DEALER socket is asynchronous, it does not need to wait for past requests to be answered before submitting new queries. In our example, we will periodically submit subscription (renewal) requests, and continuously poll our socket for new messages.
Our example demonstrates a simple script performs the following logic to subscribe to updates of our (testnet) public key hash.
- Endless while loop
- Submit subscription request periodically
- Poll DEALER socket for received messages
- Case 0: Parse subscription confirmation
- Case 1: Parse public key hash update notification
This is implemented in the script below.
#include <iostream>
#include <chrono>
#include <bitcoin/protocol.hpp>
#include <bitcoin/bitcoin.hpp>
int main() {
// Setup of zmq context & socket
//--------------------------------------------------------------------------
// Create context
bc::protocol::zmq::context my_context(true); //started
bc::protocol::zmq::socket subscription_socket(
my_context,
bc::protocol::zmq::socket::role::dealer
);
// Create socket & connect to endpoint.
bc::code ec;
bc::config::endpoint public_endpoint("tcp://testnet1.libbitcoin.net:19091");
ec = subscription_socket.connect(public_endpoint); // catch error here
// Adding our socket to our poller. Multiple Sockets possible.
bc::protocol::zmq::poller my_poller;
my_poller.add(subscription_socket);
// Subscription socket ID
bc::protocol::zmq::identifier subscription_socket_id =
subscription_socket.id();
// Initiate next subscription time.
std::chrono::steady_clock::time_point next_subscribe_time =
std::chrono::steady_clock::now();
// Initiate Message ID for message correlation.
// Not implemented in this example.
uint32_t message_id(0);
// Rewewal period necessary for subscription (depends on server config)
auto renewal_period = std::chrono::minutes(5);
Note, that we should set the subscription renewal period according to the server config settings, so that our subscription never expires as long as we keep our DEALER socket alive.
We can then set the public key hash we would like to monitor updates for. The testnet address is commented below, so the reader may send testnet coins to the address and observe the resulting subscription notifications console print-outs.
// Public key hash we are subscribing to.
//--------------------------------------------------------------------------
// Address Format: mk7gLaQfT3kEuZ3fAo3SivRkPWYgkFDfjH
auto my_pubkey_hash = bc::to_chunk(bc::base16_literal(
"3271bdf25c28ec5733b3faac3c3adb2b6fd0d15c"));
We run a endless loop below since the updates can be occur anytime after our subscription query.
// Main Subscription Renewal & Polling Loop.
//--------------------------------------------------------------------------
while (true)
{
// Subscription Renewals.
//----------------------------------------------------------------------
std::chrono::steady_clock::time_point now_time
= std::chrono::steady_clock::now();
if (now_time >= next_subscribe_time)
{
// Send subscription request/renewal
bc::protocol::zmq::message my_request;
std::string command = "subscribe.address";
// 4-byte message identifier
bc::data_chunk payload(bc::to_chunk(my_pubkey_hash)); // empty
my_request.enqueue(bc::to_chunk(command));
my_request.enqueue(bc::to_chunk(
bc::to_little_endian(message_id)));
my_request.enqueue(payload);
// Socket send: Success/Failure
if ((ec = my_request.send(subscription_socket)))
{
std::cout << ec.message() << std::endl;
return 1;
}
// Set subscription rewewal period
next_subscribe_time += renewal_period;
message_id++;
}
The socket polling below shares the same loop as the subscription renewals. Inbound messages are either subscription confirmations or public key hash updates and can be distinguished by the command frame in the respective response.
// Polling Sockets for Subscription Confirmation / Updates.
//----------------------------------------------------------------------
bc::protocol::zmq::identifiers socket_ids = my_poller.wait(2000);
if (socket_ids.contains(subscription_socket_id))
{
bc::protocol::zmq::message server_response;
server_response.receive(subscription_socket);
// Frame 0: Read the response command.
std::string response_command = server_response.dequeue_text();
// Frame 1: Read the message id (No checks implemented).
uint32_t my_message_id;
server_response.dequeue(my_message_id);
// Frame 2: Read the payload.
bc::data_chunk reply_payload;
server_response.dequeue(reply_payload);
// Payload[:4]: Read out error code
bc::data_source reply_byte_stream(reply_payload);
bc::istream_reader reply_byte_stream_reader(reply_byte_stream);
bc::code message_ec = reply_byte_stream_reader.read_error_code();
// Terminate if response message error.
if (message_ec != bc::error::success )
{
std::cout << "Response Error" << std::endl;
std::cout << message_ec.message() << std::endl;
return 1;
}
if(response_command == "subscribe.address")
{
// subscription confirmed
std::cout << "Subscription confirmed" << std::endl;
}
// Parse Update for block height & transaction hash
//------------------------------------------------------------------
else if(response_command == "notification.address")
{
// Update notification sequence.
// Not implemented: Update sequence check.
uint16_t update_sequence =
reply_byte_stream_reader.read_2_bytes_little_endian();
// Blockheight.
uint32_t height =
reply_byte_stream_reader.read_4_bytes_little_endian();
// Transaction hash (little endian).
// We use istream reader to reverse byte order.
auto tx_hash_little_endian =
reply_byte_stream_reader.read_bytes();
bc::data_source tx_hash_byte_stream(tx_hash_little_endian);
bc::istream_reader tx_hash_stream_reader(tx_hash_byte_stream);
auto tx_hash =
tx_hash_stream_reader.read_reverse<bc::hash_size>();
std::cout
<< "---------------" << std::endl
<< "Update received" << std::endl
<< "Notification sequence: "
<< update_sequence << std::endl
<< "Block height: "
<< height << std::endl
<< "Transaction hash: "
<< bc::encode_base16(bc::to_chunk(tx_hash))
<< std::endl;
}
}
// Code if messages received from other sockets
}
// kill socket()
// kill context()
return 0;
}
When we start the script and subsequently send a transaction to the address corresponding to the subscribed public key hash, we will get the following print out below.
---------------
Subscription confirmed
---------------
Update received
Notification sequence: 1
Block height: 0
Transaction hash: 397670c549b5663347b90b1c52edeb9a6bbcc70ba61f67864182cedfec282856
---------------
Subscription confirmed
---------------
Subscription confirmed
---------------
Update received
Notification sequence: 2
Block height: 1298014
Transaction hash: 397670c549b5663347b90b1c52edeb9a6bbcc70ba61f67864182cedfec282856
Notice the first notification indicates a block height of 0, and is received as soon as the transaction is detected in the transaction pool. The second notification arrives once the transaction is mined and organised into the strong chain. The subscription confirmations result from the periodic renewal queries.
Users | Developers | License | Copyright © 2011-2024 libbitcoin developers
- Home
- Build Server
- Download Server
- Frequently Asked Questions
- General Information
- Client Server Interface
- Configuration Settings
- Tutorials