NATS Logo by Example

Multi-Stream Consumption (legacy) in JetStream

There may be use cases where a fan-in of messages across streams may be desired. One way to achieve this is to create a push consumer per stream and specify the same DeliverSubject and, optionally, DeliverGroup. This will result in each consumer delivering messages to clients subscribed to the subject and/or part of a queue group.

This example will demonstrate how to configure the consumers and subscription to achieve this fan-in consumption.

CLI Go Python JavaScript Rust C# C#2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/multi-stream-consumption/go
View the source code or learn how to run this example yourself


package main

import (


func main() {
	natsURL := os.Getenv("NATS_URL")
	if natsURL == "" {
		natsURL = nats.DefaultURL

	nc, _ := nats.Connect(natsURL)
	js, _ := nc.JetStream()

Create a stream for each region.

		Name:     "EVENTS-EU",
		Subjects: []string{">"},

		Name:     "EVENTS-US",
		Subjects: []string{">"},

Create a consumer for each stream. Both publish to the same deliver subject. This is a straightforward way to do this in a single account. It is recommended a user is created with specific permissions to subscribe to this subject.

	js.AddConsumer("EVENTS-EU", &nats.ConsumerConfig{
		Durable:        "processor",
		DeliverSubject: "",
		DeliverGroup:   "processor",
		AckPolicy:      nats.AckExplicitPolicy,

	js.AddConsumer("EVENTS-US", &nats.ConsumerConfig{
		Durable:        "processor",
		DeliverSubject: "",
		DeliverGroup:   "processor",
		AckPolicy:      nats.AckExplicitPolicy,

Publish messages to each stream.

	js.Publish("", nil)
	js.Publish("", nil)
	js.Publish("", nil)
	js.Publish("", nil)
	js.Publish("", nil)
	js.Publish("", nil)

Subscribe to the deliver subject with core NATS subscription. Observe that messages from both streams are being received and can be ack’ed.

	sub, _ := nc.QueueSubscribeSync("", "processor")
	defer sub.Drain()

	for {
		msg, err := sub.NextMsg(time.Second)
		if err == nats.ErrTimeout {


Confirm the consumer state is updated.

	info1, _ := js.ConsumerInfo("EVENTS-EU", "processor")
	fmt.Printf("eu: last delivered: %d, num pending: %d\n", info1.Delivered.Stream, info1.NumPending)
	info2, _ := js.ConsumerInfo("EVENTS-US", "processor")
	fmt.Printf("us: last delivered: %d, num pending: %d\n", info2.Delivered.Stream, info2.NumPending)

eu: last delivered: 3, num pending: 0
us: last delivered: 3, num pending: 0


Note, playback is half speed to make it a bit easier to follow.