Core Publish-Subscribe in Messaging
This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:
- Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
- There are two circumstances when a published message won’t be delivered to a subscriber:
- The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
- There is a network interruption where the message is ultimately dropped
- Messages are published to subjects which can be one or more concrete tokens, e.g.
greet.bob. Subscribers can utilize wildcards to show interest on a set of matching subjects.
Code
Mix.install([
For documentation on the Gnat library, see https://hexdocs.pm/gnat/readme.html
{:gnat, "~> 1.6"},
{:jason, "~> 1.0"}
])
url = System.get_env("NATS_URL", "nats://127.0.0.1:4222")
uri = URI.parse(url)
Call start_link on Gnat to start the Gnat application supervisor
{:ok, gnat} = Gnat.start_link(%{host: uri.host, port: uri.port})
Manual subscriptions are easy and straightforward, just supply a topic and the target
pid to receive the {:msg, m} messages from the subscription.
{:ok, subscription} = Gnat.sub(gnat, self(), "nbe.*")
Here we send a message to a subject that has a subscriber
:ok = Gnat.pub(gnat, "nbe.news", "NATS by example is a great learning resource!")
In elixir, an explicit receive call blocks the current process until a message
arrives in its inbox. In this case, we’re waiting for the {:msg, m} tuple from
the NATS client.
receive do
{:msg, %{body: body, topic: "nbe.news", reply_to: nil}} ->
IO.puts("Manual subscription received: '#{body}'")
end
Now let’s move on to more resilient and production-grade ways of subscribing
In addition to one-off subscriptions, you can create a resilient consumer
supervisor that you intend to keep running for a long period of time that
will survive network partition events. This consumer supervisor can invoke
callbacks in a module that conforms to the Gnat.Server behavior, like this
DemoServer module.
defmodule DemoServer do
use Gnat.Server
def request(%{body: body, topic: topic}) do
IO.puts("Received message on '#{topic}': '#{body}'")
:ok
end
The error handler is an optional callback. Gnat.Server has a default one that you
can use.
def error(%{gnat: gnat, reply_to: reply_to}, _error) do
Gnat.pub(gnat, reply_to, "Something went wrong and I can't handle your message")
end
end
The Gnat.ConnectionSupervisor is a process that monitors your NATS connection. If connection
is lost, this process will retry according to its backoff settings to re-establish a connection.
gnat_supervisor_settings = %{
name: :gnat,
backoff_period: 4_000,
connection_settings: [
%{host: uri.host, port: uri.port}
]
}
{:ok, _conn} = Gnat.ConnectionSupervisor.start_link(gnat_supervisor_settings)
The connection supervisor’s start_link establishes a connection asynchronously, so we need to
delay here until the connection is running. This isn’t normally a problem when putting connection
supervisors into a supervision tree at startup
if Process.whereis(:gnat) == nil do
Process.sleep(300)
end
Consumer supervisors work in tandem with connection supervisors. The connection_name setting
refers to the name of a supervised connection, and not the Gnat application.
consumer_supervisor_settings = %{
connection_name: :gnat,
This is the module name of a module that exhibits the Gnat.Server behavior
module: DemoServer,
We can subscribe on multiple topics, each of which can have wildcards
subscription_topics: [
%{topic: "rpc.demo", queue_group: "demo"},
],
}
In most applications the connection and consumer supervisors are started as part of the
supervision tree, but for this sample we just create it manually via start_link.
{:ok, _sup} = Gnat.ConsumerSupervisor.start_link(consumer_supervisor_settings)
IO.puts("Started consumer supervisor")
This publishes on the topic on which our consumer supervisor is listening.
Gnat.pub(:gnat, "rpc.demo", "hello")
Output
* creating /root/.mix/archives/hex-2.0.6
Resolving Hex dependencies...
Resolution completed in 0.042s
New:
cowlib 2.12.1
ed25519 1.4.1
gnat 1.7.1
jason 1.4.1
nimble_parsec 1.3.1
nkeys 0.2.2
telemetry 1.2.1
* Getting gnat (Hex package)
* Getting jason (Hex package)
* Getting cowlib (Hex package)
* creating /root/.mix/elixir/1-15/rebar3
* Getting nimble_parsec (Hex package)
* Getting nkeys (Hex package)
* Getting telemetry (Hex package)
* Getting ed25519 (Hex package)
You have added/upgraded packages you could sponsor, run `mix hex.sponsor` to learn more
==> ed25519
Compiling 2 files (.ex)
Generated ed25519 app
==> nkeys
Compiling 2 files (.ex)
Generated nkeys app
==> nimble_parsec
Compiling 4 files (.ex)
Generated nimble_parsec app
===> Analyzing applications...
===> Compiling telemetry
==> jason
Compiling 10 files (.ex)
Generated jason app
===> Analyzing applications...
===> Compiling cowlib
==> gnat
Compiling 11 files (.ex)
Generated gnat app
Manual subscription received: 'NATS by example is a great learning resource!'
10:49:27.682 [debug] connecting to %{port: 4222, host: "nats"}
Started consumer supervisor
Received message on 'rpc.demo': 'hello'
Set up the dependencies for this script. Ordinarily you would have this set of dependencies declared in your
mix.exsfile.