{Josh Rendek}

<3 Ruby & Go

Understanding ElasticSearch Performance

Performance before and after Optimizations

When working with billions of documents in your Elasticsearch cluster, there are a few important things to keep in mind:

  • Look at what the big players do (Elasticsearch/Kibana) for organization and planning
  • Experiment with index sizes that make sense for your business, don’t just assume 1 index for a billion documents is a good idea (even if you N shards)
  • Understand which metrics to monitor when you are performance testing your cluster
  • Monitor all points of ingestion: Elasticsearch, Load balancers (ELB, HAProxy, etc), and your application code that is inserting

What do the big players do?

Split by date ranges. Based on your data, decide whether daily, weekly, or even monthly splits are best for your dataset. Elasticsearch reccomends not going over 30-32G per shard based on current JVM memory reccomendations. The reason they reccomend to stay below 32G of ram per shard is that after that, the JVM will use uncompressed pointers which means internal pointers go from 4 bytes to 8 bytes, which (depending on your memory size) can lead to decreased heap available and also increased GC times from the JVM.

Don’t allocate more than 50% of your system memory for the JVM. Your kernel will cache files and help keep performance up. Over-allocating the JVM can lead to poor performance from the underlying engine, Lucene, which relies on the OS cache as well as the JVM to do searches.

Understand your users: other devs, other systems, etc. Don’t do deep pagination instead, use scan and scroll. Turn on slow logging to find any queries doing this or returning to many points of data per query.

Index Sizing and Memory

Keeping in mind the 30-32G per shard reccomendation, this will determine the number of shards per dataset. Remember shards are not modifiable but replicas are. Shards will increase indexing performance, while replicas will increase search performance.

Overwhelmed and can’t figure out what to do? Just start with an index and see how things go. Using alias’s you can create another index later on and map both of them together for searching (and eventually delete the old one if the data expires). If you start out with alias’s being used, transitions can be seemless (no need to redeploy to point to the new alias/index name).

Metrics to monitor

Use the plugin community to monitor your cluster: ElasticHQ, BigDesk, Head and Paramedic.

Watch for refresh/merge/flush time (ElasticHQ makes this available under Node Diagnostics). For example, with a large index (1TB) that has frequent updates or deletions, in order for the data to actually be freed from the disk and cluster fully, a merge must be performed. When the number of segments in a cluster gets to large, this can cause issues for refreshing and merging.

The basic idea is the larger your index, the more segments, and the more optimization steps that need to be performed. Automatic flushes happen every few seconds so more segments get created - as you can imagine this gets compounded the larger your index is. You can see a full rundown of how deleting and updating works in the documentation.

By seperating our indexes into smaller datasets (by day, week, or month) we can eliminate some of the issues that pop up. For example, a large number of segments can cause search performance issues until an optmize command is run (which in itself can cause high IO and make your search unavailable). By reducing the data we reduce the time these operations can take. We also end up at a point where no new data is inserted into the old indexes, so no further optimizations need to be done on them, only new indexes. Any acitivity on the old indexes then should only be from searching and will reduce the IO requirements from the cluster for those shards/indexes.

This also greatly simplifies purging old data. Instead of having to have the cluster do merges and optimizations when we remove old documents, we can just delete the old indexes and remove them from the aliases. This will also reduce the IO overhead on your cluster.

Monitoring Ingestion

Watch your ELB response time - is it spiking? Check flush, merge, and indexing times.

Add logging to your posts to understand how long each bulk insert is taking. Play with bulk sizes to see what works best for your document/datasize.

When moving from a single large index to aliased indexes, insertion times went from 500ms-1.5s+ to 50ms on average. Our daily processes that were taking half a day to complete, finishing in less than 15 minutes.

Processing 5k log lines per minute? Now we’re processing over 6 million.

Taking the time to understand your database and how each part of it works can be worth the effort especially if you’re looking for performance gains.

Building a Distributed WaitGroup With Go and Redis

If you’ve done any concurrency work in Go you’ve used WaitGroups. They’re awesome!

Now lets say you have a bunch of workers that do some stuff, but at some point they all need to hit a single API that your rate limited against.

You could move to just using a single process and limiting it that way, but that doesn’t scale out very well.

While there are quite a few distributed lock libraries in Go, I didn’t find any that worked similarly to WaitGroups, so I set out to write one.

( If you just want the library, head on over to Github https://github.com/joshrendek/redis-rate-limiter )

Design goals:

  • Prevent deadlocks
  • Hard limit on concurrency (dont accidentally creep over)
  • Keep it simple to use
  • Use redis
  • Keep the design similar to sync.WaitGroup by using Add() and Done()

Initially I started off using INCR/DECR with WATCH. This somewhat worked but was causing the bucket to over-flow and go above the limit I defined.

Eventually I found the SETNX command and decided using a global lock with that around adding was the way to go.

So the final design goes through this flow for Add():

  1. Use SETNX to check if a key exists; loop until it doesn’t error (aka the lock is available for acquiring)
  2. Immediately add an expiration to the lock key once acquired so we don’t deadlock
  3. Check the current number of workers running; wait until it is below the max rate
  4. Generate a uuid for the worker lock, use this to SET a key and also add to a worker set
  5. Set an expiration on the worker lock key based on uuid so the worker doesn’t deadlock
  6. Unlock the global lock from SETNX by deleting the key
  7. Clean old, potentially locked workers

Removing is much simpler with Done():

  1. Delete the worker lock key
  2. Remove the worker lock from the worker set

For (1) we want to make sure we don’t hammer Redis or the CPU - so we make sure we can pass an option for a sleep duration while busy-waiting.

(2) Prevents the global lock from stalling out if a worker is cancelled in the middle of a lock acquisition.

Waiting for workers in (3) is done by making sure the cardinanality ( SCARD ) of the worker set is less than the worker limit. We loop and wait until this count goes down so we don’t exceed our limit.

(4) and (5) uses a UUID library to generate a unique id for the worker lock name/value. This gets added via SADD to the wait group worker set and also set as a key as well. We set a key with a TTL based on the UUID so we can remove it from the set via another method if it no longer exists.

(6) frees the global lock allowing other processes to acquire it while they wait in (1).

To clear old locks in (7) we need to take the members in the worker set and then query with EXISTS to see if the key still exists. If it doesn’t exist but it is still in the set, we know something bad happened. At this point we need to remove it from the worker set so that the slot frees up. This will prevent worker deadlocks from happening if it fails to reach the Done() function.

The Add() function returns a UUID string that you then pass to Done(uuid) to remove the worker locks. I think this was the simplest approach for doing this however if you have other ideas let me know!

That’s it! We now have a distributed wait group written in go as a library. You can see the source and how to use it over at https://github.com/joshrendek/redis-rate-limiter.

Docker and Ping: Sendmsg: Operation Not Permitted

You’ve raised your file descriptor limits, updated security limits, tweaked your network settings and done everything else in preperation to launch your shiny new dockerized application.

Then you have performance issues and you can’t understand why, it looks to be network related. Alright! Let’s see what’s going on:

1
2
ping google.com
unknown host google.com

Maybe its DNS related…. Let’s try again:

1
2
ping 8.8.8.8
ping: sendmsg: Operation not permitted

That’s odd, maybe it’s a networking issue outside of our servers. Lets try pinging another host on the subnet:

1
2
ping 10.10.0.50
ping: sendmsg: Operation not permitted

That’s even more odd, our other host isn’t having network issues at all. Lets try going the other way:

1
2
ping 10.10.0.49 # the bad host
# Lots of packet loss

We’re getting a lot of packet loss going from Host B to Host A (the problem machine). Maybe it’s a bad NIC?

Just for fun I decided to try and ping localhost/127.0.0.1:

1
2
ping 127.0.0.1
ping: sendmsg: Operation not permitted

That’s a new one. What the heck is going on? Now at this point I derped out and didn’t think to check dmesg. Lets assume you went down the road I went and derped.

What’s the different between host A and B? Well, host B doesn’t have docker installed!

1
2
3
4
5
6
7
8
9
10
apt-get remove docker-engine; reboot

# .... wait for reboot

ping 127.0.0.1
# working
ping 8.8.8.8
# working
ping google.com
# working
1
2
3
4
5
6
apt-get install docker-engine
ping 127.0.0.1
ping: sendmsg: Operation not permitted

ping 8.8.8.8
ping: sendmsg: Operation not permitted

Okay so it happens when docker is installed. We’ve isolated it. Kernel bug maybe? Queue swapping around kernels and still the same issue happens.

Fun side note: Ubuntu 14.04 has a kernel bug that prevents booting into LVM or software raided grub. Launchpad Bug

Switching back to the normal kernel (3.13) that comes with 14.04, we proceed. Docker bug? Hit up #docker on Freenode. Someone mentions checking dmesg and conntrack information.

Lo-and-behold, dmesg has tons of these:

1
2
ip_conntrack: table full, dropping packet
# x1000

How does docker networking work? NAT! That mean’s iptables needs to keep track of all your connections, hence the full message.

If you google the original message you’ll see a lot of people telling you to check your iptables rules and ACCEPT/INPUT chains to make sure there isn’t anything funky in there. If we combine this knowledge + the dmesg errors, we now know what to fix.

Lets update sysctl.conf and reboot for good measure ( you could also apply them with sysctl -p but I wanted to make sure everything was fresh. )

1
2
3
net.ipv4.netfilter.ip_conntrack_tcp_timeout_established = 54000
net.netfilter.nf_conntrack_generic_timeout = 120
net.netfilter.nf_conntrack_max = 556000

Adjust the conntrack max until you hit a stable count (556k worked well for me) and don’t get anymore connection errors. Start your shiny new docker application that makes tons of network connections and everything should be good now.

Hope this helps someone in the future, as Google really didn’t have a lot of useful information on this message + Docker.

Influx Alert

I’ve been very happy using InfluxDB with Grafana + StatsD but always wanted a nice way to alert on some of the data being fed into statsd/grafana so I wrote a little tool in Go to accomplish that:

Github: https://github.com/joshrendek/influx-alert

I hope someone finds this useful! It’s got a few simple functions/comparisons done already and support for HipChat and Slack notifications.

Documentation

Influx Alert

This is a tool to alert on data that is fed into InfluxDB (for example, via statsd) so you can get alerted on it.

How to get it

Go to releases, or download the latest here: v0.1

How to Use

  • name: the name of the alert ( will be used in notifier )
  • interval: how often to check influxdb (in seconds)
  • timeshift: how far back to go (query is like: where time > now() - TIMESHIFT
  • limit: the max number of results to return
  • type: influxdb (the only option for now)
  • function: min/max/average are the only supported functions for now
  • query: the influxdb query to run (omit any limit or where clause on the time)
  • trigger: the type of trigger and value that would trigger it
    • operator: gt/lt
    • value: value to compare against (note all values are floats internally)
  • notifiers: an array of notifiers, possible options are slack and hipchat

Example: ( see example.yml for more )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
- name: Not Enough Foo
  type: influxdb
  function: average
  timeshift: 1h
  limit: 10
  interval: 10
  query: select * from "foo.counter"
  notifiers:
      - slack
      - hipchat
      - foobar
  trigger:
    operator: lt
    value: 10

Environment Variables

1
2
3
4
5
6
7
8
9
10
11
  * INFLUX_HOST
  * INFLUX_PORT (8086 is default)
  * INFLUX_DB
  * INFLUX_USER
  * INFLUX_PASS
  * SLACK_API_TOKEN
  * SLACK_ROOM
  * HIPCHAT_API_TOKEN
  * HIPCHAT_ROOM_ID
  * HIPCHAT_SERVER (optional)
  * DEBUG (optional)

Supported Notifiers

Supported Backends

  • InfluxDB v0.9

Getting Upstart to Log to Syslog With Tags

I was setting up the ELK stack and had quite a fun time trying to get upstart to log to syslog WITH a log tag ( aka: my-application ) so it could be filtered inside Kibana.

Here is a working example for STDOUT and STDERR:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
respawn
respawn limit 15 5

start on runlevel [2345]
stop on runlevel [06]

setuid app-user
setgid app-user

script
  # Redirect stdout to syslog
  mkfifo /tmp/app-stdout-fifo
  ( logger -p user.info -t your-app-tag </tmp/app-stdout-fifo & )
  exec 1>/tmp/app-stdout-fifo
  rm /tmp/app-stdout-fifo

  # Redirect stderr to syslog
  mkfifo /tmp/app-stderr-fifo
  ( logger -p user.err  -t your-app-tag </tmp/app-stderr-fifo & )
  exec 2>/tmp/app-stderr-fifo
  rm /tmp/app-stderr-fifo

  exec ./your-app-binary
end script

Hope this helps someone else, there as a lot of mis-leading and broken examples on Google & StackOverflow.

Golang Performance Tips

Below is some advice and notes that I wish I had when writing Go to deal with high amounts of requests (20k+/second). Have any extra tips? Leave them in the comments!

Kernel Tuning

Step 1 is making sure your host OS isn’t going to keel over when you start making thousands of requests/second or hammering the CPU.

Update /etc/sysctl.conf to have these lines:

1
2
3
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 1
net.ipv4.ip_local_port_range = 50000

ip_local_port_range - at the default of 30,000 and not modifying the tw_reuse and tw_recycle properties, we’re effectively limited to 500 connections/second to a server. If this is still not enough you can configure additional IP’s on the server and cycle between them.

tcp_tw_reuse will re-use an existing connection that is in TIME-WAIT for outgoing connections.

tcp_tw_recycle enables sockets to be recycled faster once they reach the TIME-WAIT state for both incoming and outgoing connections. Make sure you’re not running anything through a NAT or this can cause problems with connections.

Vinent Bernat has a great explanation with state diagrams on his blog.

Next up are file descriptors. I prefer defining these in the init or upstart scripts, so you would call ulimit -n 102400 and then call your go binary in the upstart script that way it is set before running. (Note: this will only work if the user has been properly given permissions to up their limit in /etc/security/limits.d.

Upstart also provides a mechanism to set file limits in the job stanza.

Golang Tuning

Utilizing all CPUs ( < Go 1.5 )

You can use all the go-routines in the world and not use all your CPU cores and threads. In order to let your go program utilize all operating-system level threads, we need to tell the go runtime about them:

1
runtime.GOMAXPROCS(runtime.NumCPU())

This is no longer necessary as of Go 1.5 and is done automatically.

Finish what you start

Make sure you call .Close() on your responses, and make sure you read the entire body. The documentation for net/http/response explicitly says that “it is the caller’s responsibility to close Body” and that “neither ReadResponse nor Response.Write ever closes a connection.” net/http/response.go

Don’t be intimidated

You want to do things fast! But your confused by all the options for concurrency in go. Channels? Goroutines? Libraries to manage them? Stick with a simple worker pattern for best results. I’ve found many libraries that claim to manage concurrency for you (limiting running routines, or providing some interface to queueing jobs) fall short, break, or not utilize all CPU cores.

Here is a simple worker pattern that uses nothing but the standard library:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
tasks := make(chan someDataStruct, 40)
var wg sync.WaitGroup

for i := 0; i < 40; i++ {
  wg.Add(1)
  go func() {
      for data := range tasks {
          // do some work on data
      }
      wg.Done()
  }()
}

// Push to it like this:
tasks <- someData

// Finish like this
close(tasks)
wg.Wait()

First, we make a channel containing someDataStruct as the type to be sent/received over it. We give it a buffer size of 40. Since we only have 40 routines spinning up, no more than 40 can be worked on at once.

When a caller is trying to push data to this channel and all slots are full, it will block until a slot is free, so keep this in mind and change accordingly if you need to.

Next we make a WaitGroup which will wait for all of our goroutines to finish. When we loop 40 times and say wg.Add(1) we’re telling the WaitGroup that we’re expecting 40 goroutines, and to wait for them to finish.

Next we iterate over data coming in our tasks channel and do some process on it (this is obviously where your program specific logic or function calls go).

When no more data is available on the channel we call wg.Done() which tells the WaitGroup a routine has finished.

Pushing data is simple by passing an instance of someDataStruct into the tasks channel.

Almost done! We now want to wait for everything to finish before our program exits. close(tasks) marks the channel as closed - and any other callers who try and send to it will get a nice fat error message.

Finally wg.Wait() says to wait until all 40 wg.Done()’s have been called.

Errors

One of my favorite things about go is that its fast, real fast. Make sure you test, test, and test some more! Always make sure you fail gracefully (if a HTTP connection failed and you need to re-process a job, for instance) and push jobs back onto their queues when a failure is detected. If you have an unexpected race condition or other errors (run out of file descriptors, etc) go will very quickly churn through your job queue.

But what about…

There are lots of other considerations, like what you’re running this against. On small elasticsearch clusters using these patterns to send data from go daemons to ES, I’ve been able to hit 50k requests/second with still plenty of room to grow.

You may need to pay extra attention to what libraries your using: how many redis connections can you have open? How many do you need?

Are you using keep-alive connections for HTTP? Is your receiver setup properly (nginx configs, etc)?

Is your MySQL or PostgreSQL server tuned to allow this many connections? Make sure you use connection pooling!

Lastly: Monitor all the things!

Send your data somewhere. I prefer StatsD, InfluxDB and Grafana for my monitoring stack. There is a ready-to-use go library quipo/statsd that I haven’t had issues with. One important thing to do is throw any data sends into a goroutine otherwise you might notice a slowdown while it tries to send the data.

Whether you use Grafana or anything else, its important to monitor. Without metrics on how your systems are running (ops/s, latency, etc) you have no insight into whether or not new changes have affected the overall throughput of your system.

Have any extra tips? Leave them in the comments below!

Using a Custom HTTP Dialer in Go

Let’s make a function to generate an HTTP client for us using a custom dialer:

1
2
3
4
5
6
7
8
9
10
var DefaultDialer = &net.Dialer{}

func GetHttpClient() http.Client {
  tr := &http.Transport{
      Dial:                DefaultDialer.Dial,
  }

  client := http.Client{Transport: tr}
  return client
}

Can you spot the bug?

By omitting the Timeout, KeepAlive timeouts in the first example, we’ve introduced a very subtle bug.

There is also another bug if you don’t handle TLS timeouts as well.

net/Dialer has some documentation on this.

Without providing a KeepAlive and a Timeout value, you could end up with connections that hang indefinitely. By omitting the TLS handshake timeout, the daemon would also hang trying to re-negotiate the SSL connection.

In my case this was causing a very random and hard to reproduce issue where the program would hang indefinitely.

Some good debugging tips are using strace to see what syscall its stuck in, and if your daemon is running in the foreground, using a SIGQUIT signal.

Here is a working version:

1
2
3
4
5
6
7
8
9
10
11
var DefaultDialer = &net.Dialer{Timeout: 2 * time.Second, KeepAlive: 2 * time.Second}

func GetHttpClient() http.Client {
  tr := &http.Transport{
      Dial:                DefaultDialer.Dial,
      TLSHandshakeTimeout: 2 * time.Second,
}

  client := http.Client{Transport: tr}
  return client
}

Faster Docker Builds Using a Cache

If you’re using bundler for your ruby or rails project and docker you will run into docker having to install your gems everytime. You can either make a base image that has the bundle cache already on it, or you can make a small cache step in your Dockerfile.

Here I’ve setup a cache user and host to store the cache tar. It will attempt to download and untar it, run bundle, then attempt to tar and re-upload it.

1
2
3
4
5
RUN scp -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no cache@172.17.42.1:~/project.tar.gz . || true
RUN tar xzf project.tar.gz || true
RUN bundle install --deployment --without development test
RUN tar czf project.tar.gz vendor
RUN scp -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no project.tar.gz cache@172.17.42.1:~/ || true

Doing this cut build times for my image from a few minutes to a few seconds. If you have any other tricks for speeding up builds, let me know!

Communication Between React Components Using Events

Here is an example of a clean way to communicate between React components without getting stuck passing @prop callbacks all around. Inspired by looking at the new Flux React utilities.

We’re going to start off with a simple HAML file:

index.html.haml
1
2
3
4
5
6
%script{src: "https://cdnjs.cloudflare.com/ajax/libs/react/0.12.2/react-with-addons.js"}
%div{data: { ui: 'alerts' }}
%div{data: { ui: 'widgets' }}
:javascript
  React.renderComponent(Widgets(), document.querySelector('[data-ui="widgets"]'))
  React.renderComponent(Alerts(), document.querySelector('[data-ui="alerts"]'))

Next comes our Widget component.

widget.js.coffee
1
2
3
4
5
6
7
8
{div, button} = React.DOM

Widgets = React.createClass
  render: ->
    div className: 'widget',
      button className: 'btn btn-primary', onClick: (=> @_sendMsg('Testing')), 'Click Me'
  _sendMsg: (msg) ->
    $('[data-ui="alerts"]').trigger("message", ["Widget clicked."])

On line 1 we’re defining some easy helper methods to access the React.DOM object - otherwise on every line we’d be writing something like React.DOM.div or whichever element we were going to call.

Line 4 is our render method. Everytime state gets mutated or the component is loaded, this method is called.

On line 6 we’re creating an anonymous function but passing in the local scope using a fat arrow => so we can access our other functions in the class. We call it inside an anonymous function so we can pass an argument to it, in this case the message.

Line 7 is our function that fires the event. I’m using the _sendMsg syntax to denote it is a private function. The first argument to the jQuery event emitter is the event name, followed by a list of arguments.

Now lets write our Alert handler and go through it line by line.

alerts.js.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{div} = React.DOM
Alerts = React.createClass
  messageTimeout: 5000
  getInitialState: ->
    messages: []

  componentDidMount: ->
    $('[data-ui="alerts"]').on 'message', (event, msg) =>
      msgs = @state.messages
      msgs.push(msg)
      @setState(messages: msgs)

  componentDidUpdate: ->
    @state.messages.map (msg, index) =>
      setTimeout(( => @_removeMsg(index)), @messageTimeout)

  render: ->
    div {},
      @state.messages.map (msg, index) =>
        div className: 'alert alert-info',
          msg

  _removeMsg: (index) ->
    msgs = @state.messages
    msgs.splice(index, 1)
    @setState(messages: msgs)

Line 1 we’re doing the same thing as before, creating a little helper method.

Line 3 is a class variable (we also could have used props here but I went with the class variable instead).

Line 4 is a function that defines the initial state of the component once it is mounted on the page. Here we are saying that there is an empty messages array.

Line 7 is a life cycle event of a React component, called componentDidMount which is called after the component has been rendered into the DOM and mounted. Here we are telling jQuery to bind to any events that are triggered on the [data-ui="alerts"] object and process them. We take the current messages from @state.messages, push the newest message on to the end and then finally call @setState to mutate the components state.

Now the next part on line 13 is how we can gracefully remove messages after they have been rendered. componentDidUpdate is another React life cycle event and is called after a render occurs (and renders occur because the component was updated). We iterate over each message using the map function and call setTimeout with an anonymous function that calls @_removeMsg and passes in an index. @messageTimeout is how we access the class variable defined at the top of the file.

Line 17 is a render call to display all the messages. Note that it is wrapped in a div because you can’t return a collection of objects from render, it must a single root element with nodes underneath.

Line 23 is our message removal function. We set @state.messages to a local variable, remove one element at index and then mutate the state by setting it to our local variable with @setState.

Below is an example of the final product.

See the Pen ZYoEWg by Josh Rendek (@joshrendek) on CodePen.

I’d like to thank my friend/co-worker Robert Pearce for getting me into React and showing me that everything doesn’t need to be jQuery!