NATS Logo by Example

Core Publish-Subscribe in Messaging

This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:

  • Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
  • There are two circumstances when a published message won’t be delivered to a subscriber:
    • The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
    • There is a network interruption where the message is ultimately dropped
  • Messages are published to subjects which can be one or more concrete tokens, e.g. greet.bob. Subscribers can utilize wildcards to show interest on a set of matching subjects.
CLI Go Python JavaScript Rust C# C#2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/pub-sub/dotnet2
View the source code or learn how to run this example yourself

Code

Install NATS.Client.Core from NuGet.

using NATS.Client.Core;

NATS_URL environment variable can be used to pass the locations of the NATS servers.

var url = Environment.GetEnvironmentVariable("NATS_URL") ?? "127.0.0.1:4222";
Console.WriteLine($"Connecting to {url}...");

Connect to NATS server. Since connection is disposable at the end of our scope we should flush our buffers and close connection cleanly.

var opts = NatsOpts.Default with { Url = url };
await using var nats = new NatsConnection(opts);

Subscribe to a subject and start waiting for messages in the background.

await using var sub = await nats.SubscribeAsync<Order>("orders.>");


Console.WriteLine("[SUB] waiting for messages...");
var task = Task.Run(async () =>
{
    await foreach (var msg in sub.Msgs.ReadAllAsync())
    {
        var order = msg.Data;
        Console.WriteLine($"[SUB] received {msg.Subject}: {order}");
    }


    Console.WriteLine($"[SUB] unsubscribed");
});

Let’s publish a few orders.

for (int i = 0; i < 5; i++)
{
    Console.WriteLine($"[PUB] publishing order {i}...");
    await nats.PublishAsync($"orders.new.{i}", new Order(OrderId: i));
    await Task.Delay(1_000);
}

We can unsubscribe now all orders are published. Unsubscribing or disposing the subscription should complete the message loop and exit the background task cleanly.

await sub.UnsubscribeAsync();
await task;

That’s it! We saw how we can subscribe to a subject and publish messages that would be seen by the subscribers based on matching subjects.

Console.WriteLine("Bye!");


public record Order(int OrderId);

Output

Connecting to nats://nats:4222...
[SUB] waiting for messages...
[PUB] publishing order 0...
[SUB] received orders.new.0: Order { OrderId = 0 }
[PUB] publishing order 1...
[SUB] received orders.new.1: Order { OrderId = 1 }
[PUB] publishing order 2...
[SUB] received orders.new.2: Order { OrderId = 2 }
[PUB] publishing order 3...
[SUB] received orders.new.3: Order { OrderId = 3 }
[PUB] publishing order 4...
[SUB] received orders.new.4: Order { OrderId = 4 }
[SUB] unsubscribed
Bye!

Recording

Note, playback is half speed to make it a bit easier to follow.