1 of 17

Building a distributed message processing system in Go using NSQ

Slides at bit.ly/nsqslides Some content from nsq.io

Greg Bray (@GBrayUT on Twitter)

Edge Platform Operations @WalmartLabs

2 of 17

Goal: Process logs in a complex environment

Logs and other messages are produced by:

  • Many servers across many different locations
  • For different purposes:
    • Web logs
    • System logs
    • Security logs
    • Telemetry data
    • Etc …
  • From different sources
    • Direct from various services
    • Log files (stdout / stderror)
    • syslog

Need a way to aggregate and process logs efficiently, with flexibility to meet fluid requirements

Destinations:

  • Long term storage (kafka to hdfs)
  • Search backend (ELK stack)
  • Other pipelines (nsq, syslog, etc)
  • Monitoring system (Prometheus metrics)
  • Adhoc troubleshooting
  • /dev/null (emergency overflow)

Some destinations need filtering or prioritization of data streams.

Some data streams have other requirements like encryption, low latency, etc...

Slides at bit.ly/nsqslides

3 of 17

4 of 17

5 of 17

6 of 17

Topics, Channels, and Consumers

  • Top level: Topics are streams of data
  • Split topics into Channels
    • Each channel gets a copy of all messages
  • Channels can have one or more Consumers
    • Consumers pull messages from a channel and must FIN (finish) or REQ (re-queue) each message it takes
    • Configurable timeout for automatic re-queueing
    • Scale out: more consumers for more throughput
    • Can be local or remote (basic discovery via lookupd)
    • A consumer may just filter messages and publish them into another Topic on a local or remote system
  • Topics and Channels are created at runtime, just start publishing/subscribing (Auto cleanup if #Ephemeral )

Good example of a high performance system written in Go. See internals at https://nsq.io/overview/internals.html

7 of 17

Lookupd: nsqd registers topics and channels

8 of 17

Lookupd: Consumers query for nodes/topics

9 of 17

Redundancy… eliminate Single Point of Failure

10 of 17

Messages usually stored in memory (but overflow to disk)

11 of 17

12 of 17

Example of stats output. Also using TLS with client certificates to secure access.

13 of 17

Utilities include in NSQ codebase:

  • nsq_pubsub - expose a pubsub like HTTP interface to topics in an NSQ cluster
  • nsq_stat - polls /stats for all the producers of the specified topic/channel and displays aggregate stats
  • nsq_tail - consumes the specified topic/channel and writes to stdout (in the spirit of tail)
  • nsq_to_file - consumes the specified topic/channel and writes out to a newline delimited file, optionally rolling and/or compressing the file.
  • nsq_to_http - consumes the specified topic/channel and performs HTTP requests (GET/POST) to the specified endpoints.
  • nsq_to_nsq - consumes the specified topic/channel and re-publishes the messages to destination nsqd
  • to_nsq - takes a stdin stream and splits on newlines for re-publishing to destination nsqd

More details including command line arguments at https://nsq.io/components/utilities.html

14 of 17

ARGS='--cacert ~/ca.crt --key ~/nsqd.key --cert ~/nsqd.crt -lookupd-http-address lookupd01:4161 -lookupd-http-address lookupd02:4161'

nsq_to_nsq $ARGS -destination-nsqd-tcp-address=localhost:4150 -topic topicA -destination-topic topicA-Aggregated

15 of 17

Other utilities we’ve built in Go

  • file2nsq - watches files on disk and generates nsq messages
  • nsq2kafka - send messages from specific topics to various Kafka brokers
  • nsqarchive - similar to nsq_tail but outputs a tar file with one entry per message
  • nsq2es - send messages to ElasticSearch (Can replace ELK stack with ENK stack)
  • nsqcopy - replaces multiple nsq_to_nsq instances with a single dynamic service using lookupd and a config file for topic source/destinations
  • Also tools for decoding encrypted messages, or generating metrics directly from topics/channels or a filtered topic stream

Each of the above is a simple go program, usually a few hundred lines each. They run on two “log transport” servers in each data center to aggregate logs from all local servers.

16 of 17

Haven’t yet found a breaking point for NSQD (other than running out of disk space)

17 of 17

Any questions?

If this sounds like an interesting problem, you should come help us solve it!

Hiring Dev and DevOps that are familiar with Go and interested in “Industrial Grade” Internet / Websites.

Some other interesting systems we work on (all in Go):

  • Edge Compute / FaaS platform (Lua and Go plugins)
  • High performance HTTP / HTTP2 / Quic Proxies and Load Balancers
  • Internal GSLB (Proximity aware DNS based load balancing)
  • External GeoIP / Policy based DNS load balancing
  • Solving complex problems at large scale (PCI)
  • CSS/HTML/JSON/Image optimizations
  • Running an international CDN
  • Creating metrics / dashboards / tools to help thousands of developers find large and small needles in a very large haystack

Slides at bit.ly/nsqslides