monitoring

What is JetStream ?

JetStream is the NATS company streaming solution. The JetStream server is built-in to nats-server and can be enabled with nats command options.

You can find more informations about JetStream on the official documentation : https://docs.nats.io/nats-concepts/jetstream.

Events streaming example

app example

To understand how to do events streaming with JetStream, we will use a fake Twitter app throughout the article.

The two application endpoints that will interest us here are:

  • POST /tweets
  • POST /tweets/:id/likes

The first one will create a new tweet and the second one permit to like a tweet.

As an event streaming application, the POST /tweets will produce tweet_created events and the POST /tweets/:id/likes will produce tweet_liked events.

JetStream glossary

  • Stream : A stream is a data chanel that can be used to publish messages and that can be listened by consumers.
  • Subject : The subject is what will be used to categorize the message. Each message is composed of a subject and a data payload (message content).

Run JetStream server (docker)

Run the following command to start a JetStream server.

docker run --network host -p 4222:4222 nats -js

The JetStream server is now listening on http://localhost:4222.

Managing JetStream with the Nats CLI

Install Nats CLI

brew tap nats-io/nats-tools
brew install nats-io/nats-tools/nats

Display streams

The nats stream ls command list all created streams.

➜  ~ nats stream ls
╭──────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                             Streams                                              │
├──────────────┬───────────────────────────┬─────────────────────┬──────────┬───────┬──────────────┤
│ Name         │ Description               │ Created             │ Messages │ Size  │ Last Message │
├──────────────┼───────────────────────────┼─────────────────────┼──────────┼───────┼──────────────┤
│ TWITTERCLONE │ Streaming article example │ 2022-01-07 22:01:27 │ 4836 B │ 10m15s       │
╰──────────────┴───────────────────────────┴─────────────────────┴──────────┴───────┴──────────────╯

Display messages in stream

The nats stream view <STREAM> command allows you to display messages stored in a stream.

You can also filter messages using a subject. To do that, add a --subject=<subject> to the command.

Go Project - Create a JetStream client

In the following example, we will use a stream TWITTERCLONE that will be used to publish tweet_created and tweet_liked subjects.

First, I create two variables to store my stream name and the subjects thats I will be used.

On my case, I have simply declared a wildcard to authorize any subject but you can be more precise by declaring all subjects with an array of string.

const (
  streamName     = "TWITTERCLONE"
  streamSubjects = "TWITTERCLONE.*"
)

Then, we need to create a Nats connection.

nc, _ := nats.Connect(nats.DefaultURL)

One the connection created, retrieve the JetStream context to create a new stream.

Create a new stream with the AddStream function from the JetStream instance.

This function take a nats.StreamConfig object as parameter.

js, _ := nc.JetStream()
_, err := js.AddStream(&nats.StreamConfig{  
	Name:     streamName,
	Subjects: []string{streamSubjects},
})

And thats it ! We now have a stream instance that can be used to publish and consume messages.

Publish messages

Now that we have a streaming instance, we can publish the first post.

Before that, create a struct that will represent the message object that will be published in the stream.

Here, I will use an Event struct that will represent my fake tweets events.

type Event struct {
  ID          string    `json:"id"`
  Message     string    `json:"message"`
  Date        time.Time `json:"date"`
  Author      string    `json:"author"`
}

Create a new tweet_created event entity and marshal it to JSON.

jsonEvent, _ := json.Marshal(Event {
  ID:          "1",
  Message:     "hello world !",
  Date:        time.Now(),
  Author:      "ziggornif",
})

We can now publish the tweet_created event message to the stream.

Use the JetStream instance Publish function to publish the message to the stream.

ack, err := js.Publish("TWITTERCLONE.tweet_created", jsonEvent)
if err != nil {
  log.Println(err)
}
log.Printf("Event has been published %v\n", ack.Sequence)

The message is now stored in the TWITTERCLONE stream and can be recieved by the stream consummers.

Listen messages

Our app is now able to post messages to our stream. We can now set up a message listener.

To simplify the example, we will use the JetStream instance created previously for the publisher.

JetStream allows you to subscribe to a stream using the Subscribe method.

The Subscribe function take a subject name as first parameter.

💡 Tips : You can use a wildcard (ex: TWITTERCLONE.*) to listen all the stream subjects with one listener.

The second parameter is the function that will be called each time the listener receive a message.

This function takes a single input parameter which is a nats message.

The content of the message is available in the Data attribute in byte array format (JSON object).

To be able to read it, use the json.Unmarshal to parse the message content in a Go structure.

js.Subscribe("TWITTERCLONE.*", func(msg *nats.Msg) {
  msg.Ack()
  var event Event
  err := json.Unmarshal(msg.Data, &event)
  fmt.Println(event)
}, nats.Durable("go-event-subscriber"), nats.ManualAck())

⚠️ In a streaming messaging system like JetStream, you must acknowledge every message upon receipt.

If you don’t, the consumer cursor will not be positioned on the next message and you will be stuck on the same message.

The messages are acknowledged using the Ack() method present in the received message object.

Complete example

You can retrieve and fork the complete project from here : https://gitlab.com/ziggornif/go-event-streaming

Demonstration :

result