基于Golang处理每分钟百万级的网络请求

Golang

Posted by Essen on January 2, 2020

logo

基于Golang处理每分钟百万级的网络请求

原文链接:需科学上网

I have been working in the anti-spam, anti-virus and anti-malware industry for over 15 years at a few different companies, and now I know how complex these systems could end up being due to the massive amount of data we handle daily.

我在反垃圾邮件,反病毒和反恶意软件行业的多家公司工作了15年以上,我意识到,由于我们每天处理的海量数据这些系统最终会变得很复杂。

Currently I am CEO of smsjunk.com and Chief Architect Officer at KnowBe4, both in companies active in the cybersecurity industry.

现在,我是活跃于网络安全行业的公司的smsjunk.com首席执行官和KnowBe4的首席架构师。

What is interesting is that for the last 10 years or so as a Software Engineer, all the web backend development that I have been involved in has been mostly done in Ruby on Rails. Don’t take me wrong, I love Ruby on Rails and I believe it’s an amazing environment, but after a while you start thinking and designing systems in the ruby way, and you forget how efficient and simple your software architecture could have been if you could leverage multi-threading, parallelization, fast executions and small memory overhead. For many years, I was a C/C++, Delphi and C# developer, and I just started realizing how less complex things could be with the right tool for the job.

有趣的是,在过去约十年的时间里,作为一名软件工程师,我参与的所有Web后端开发大部分都在Ruby on Rails中完成。不要误会我的意思,我喜欢Ruby on Rails,而且我相信这是一个了不起的环境,但是过了一段时间,您开始以红宝石的方式思考和设计系统,而您忘记了如果您的软件体系结构多么高效和简单,可以利用多线程,并行化,快速执行和较小的内存开销。多年以来,我一直是C / C ++,Delphi和C#的开发人员,而且我才刚刚开始意识到使用正确的工具完成工作可能会变得多么简单。

I am not very big on the language and framework wars that the interwebs are always fighting about.* I believe efficiency, productivity and code maintainability relies mostly on how simple you can architect your solution.

我对网上的语言和框架之争不太了解。我认为效率,生产力和代码可维护性主要取决于您方案的架构简单度。

The Problem

While working on a piece of our anonymous telemetry and analytics system, our goal was to be able to handle a large amount of POST requests from millions of endpoints. The web handler would receive a JSON document that may contain a collection of many payloads that needed to be written to Amazon S3, in order for our map-reduce systems to later operate on this data.

在开发我们的匿名遥测和分析系统的一部分时,我们的目标是能够处理来自数百万个端点的大量POST请求。 Web处理程序将接收一个JSON文档,该文档可能包含许多需要写入Amazon S3的有效负载的集合,以便我们的map-reduce系统以后可以对该数据进行操作。

Traditionally we would look into creating a worker-tier architecture, utilizing things such as:

按照以往的经验,我们会考虑使用以下方法创建 worker-tier 架构:

  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • and so on…

And setup 2 different clusters, one for the web front-end and another for the workers, so we can scale up the amount of background work we can handle.

并且创建两个不同的集群,一个用于Web前端另一个用于workers, 这样我们就能扩展后端的处理能力

But since the beginning, our team knew that we should do this in Go because during the discussion phases we saw this could be potentially a very large traffic system. I have been using Go for about 2 years or so, and we had developed a few systems here at work but none that would get this amount of load.

但是一开始,我的团队成员就知道我们应该用Go去做这件事情,因为在讨论阶段我们意识到这可能是一个非常大的交通系统。我已经使用Go大约两年了,并且我们已经开发了一些系统,但是没有一个需要承担如此多的负载

We started by creating a few structures to define the web request payload that we would be receiving through the POST calls, and a method to upload it into our S3 bucket.

我们首先创建一些结构来定义将通过POST调用接收的Web请求有效负载,以及将其上载到S3存储桶中的方法

type PayloadCollection struct {
	WindowsVersion  string    `json:"version"`
	Token           string    `json:"token"`
	Payloads        []Payload `json:"data"`
}

type Payload struct {
    // [redacted]
}

func (p *Payload) UploadToS3() error {
	// the storageFolder method ensures that there are no name collision in
	// case we get same timestamp in the key name
	storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

	bucket := S3Bucket

	b := new(bytes.Buffer)
	encodeErr := json.NewEncoder(b).Encode(payload)
	if encodeErr != nil {
		return encodeErr
	}

	// Everything we post to the S3 bucket should be marked 'private'
	var acl = s3.Private
	var contentType = "application/octet-stream"

	return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

Naive approach to Go routines

Initially we took a very naive implementation of the POST handler, just trying to parallelize the job processing into a simple goroutine:

一开始,我们采用了一个非常幼稚的POST处理程序实现,只是试图将作业处理并行化为一个简单的goroutine:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

	if r.Method != "POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

	// Read the body into a string for json decoding
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
	if err != nil {
		w.Header().Set("Content-Type", "application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	
	// Go through each payload and queue items individually to be posted to S3
	for _, payload := range content.Payloads {
		go payload.UploadToS3()   // <----- DON'T DO THIS
	}

	w.WriteHeader(http.StatusOK)
}

For moderate loads, this could work for the majority of people, but this quickly proved to not work very well at a large scale. We were expecting a lot of requests but not in the order of magnitude we started seeing when we deployed the first version to production. We completely underestimated the amount of traffic.

对于中等负载,对大多数人而言这样是可以工作的,但很快就证明了这种方法在大规模情况下效果不佳。当我们将第一个版本部署到生产环境时,我们期待着很多请求,但是数量并没有达到我们开始看到的数量级,我们完全低估了流量。

The approach above is bad in several different ways. There is no way to control how many go routines we are spawning. And since we were getting 1 million POST requests per minute of course this code crashed and burned very quickly.

上面的做法在好几个方面都是不好的。无法控制创建的Goroutine的数量。由于我们每分钟收到一百万个POST请求,因此该代码会很快崩溃

Trying again

We needed to find a different way. Since the beginning we started discussing how we needed to keep the lifetime of the request handler very short and spawn processing in the background. Of course, this is what you must do in the Ruby on Rails world, whether you are using puma, unicorn, passenger (Let’s not get into the JRuby discussion please). Then we would have needed to leverage common solutions to do this, such as Resque, Sidekiq, SQS, etc. The list goes on since there are many ways of achieving this.

我们需要找出一个不同的方法。从一开始我们就开始讨论,我们如何使得请求处理程序的生命周期非常短并在后台生成。当然了,这是在Ruby on Rails世界中必须要做的,否则这将阻塞网络处理器的其他可用程序,无论您使用的是什么(请不要进入JRuby讨论区)。然后,我们将需要利用常见的解决方案来做到这一点,诸如Resque,Sidekiq,SQS等。还有很多方法可以实现此目的。

So the second iteration was to create a buffered channel where we could queue up some jobs and upload them to S3, and since we could control the maximum number of items in our queue and we had plenty of RAM to queue up jobs in memory, we thought it would be okay to just buffer jobs in the channel queue.

因此,第二个迭代是创建一个缓冲通道,我们可以在其中排队一些作业并将其上传到S3,并且由于我们可以控制队列中的最大项目数,并且我们有大量的RAM可以在内存中排队作业,因此认为只将作业缓存在通道队列中是可以的。

var Queue chan Payload

func init() {
    Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    ...
    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        Queue <- payload
    }
    ...
}

And then to actually dequeue jobs and process them, we were using something similar to this:

然后要使作业出队并对其进行处理,我们使用了类似的方法:

func StartProcessor() {
    for {
        select {
        case job := <-Queue:
            job.payload.UploadToS3()  // <-- STILL NOT GOOD
        }
    }
}

To be honest, I have no idea what we were thinking. This must have been a late night full of Red-Bulls. This approach didn’t buy us anything, we have traded flawed concurrency with a buffered queue that was simply postponing the problem. Our synchronous processor was only uploading one payload at a time to S3, and since the rate of incoming requests were much larger than the ability of the single processor to upload to S3, our buffered channel was quickly reaching its limit and blocking the request handler ability to queue more items.

老实说,我不知道我们在想什么。这一定是一个充满红牛的深夜。这种做法并未给我们到来任何东西,我们已经将有缺陷的并发与缓冲队列进行了交换,这只是在推迟问题。我们的同步处理器一次只向S3上载一个有效载荷,并且由于传入请求的速率远大于单个处理器向S3上载的能力,因此我们的缓冲通道迅速达到了其极限并阻塞了请求处理程序

We were simply avoiding the problem and started a count-down to the death of our system eventually. Our latency rates kept increasing in a constant rate minutes after we deployed this flawed version.

我们只是在避免问题,并开始倒数,直到最终系统崩溃。部署此有缺陷的版本后,我们的延迟率以固定的分钟数保持增长。

undefined

The Better Solution

We have decided to utilize a common pattern when using Go channels, in order to create a 2-tier channel system, one for queuing jobs and another to control how many workers operate on the JobQueue concurrently.

我们决定在使用Go通道时使用一种通用模式,以便创建一个2层通道系统,一个用于排队作业,另一个用于控制同时在JobQueue上工作的工人数量。

The idea was to parallelize the uploads to S3 to a somewhat sustainable rate, one that would not cripple the machine nor start generating connections errors from S3. So we have opted for creating a Job/Worker pattern. For those that are familiar with Java, C#, etc, think about this as the Golang way of implementing a Worker Thread-Pool utilizing channels instead.

想法是将上传到S3的文件并行化到某种程度的可持续速度,这样既不会削弱机器,也不会开始从S3产生连接错误。因此,我们选择了创建Job / Worker模式。对于那些熟悉Java,C#等的人,可以将其视为使用通道来实现Worker Thread-Pool的Golang方法。

var (
	MaxWorker = os.Getenv("MAX_WORKERS")
	MaxQueue  = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
	Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
	WorkerPool  chan chan Job
	JobChannel  chan Job
	quit    	chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
	return Worker{
		WorkerPool: workerPool,
		JobChannel: make(chan Job),
		quit:       make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
	go func() {
		for {
			// register the current worker into the worker queue.
			w.WorkerPool <- w.JobChannel

			select {
			case job := <-w.JobChannel:
				// we have received a work request.
				if err := job.Payload.UploadToS3(); err != nil {
					log.Errorf("Error uploading to S3: %s", err.Error())
				}

			case <-w.quit:
				// we have received a signal to stop
				return
			}
		}
	}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
	go func() {
		w.quit <- true
	}()
}

We have modified our Web request handler to create an instance of Job struct with the payload and send into the JobQueue channel for the workers to pickup.

我们已经修改了Web请求处理程序,用负载创建一个Job的实例,并将其发送到JobQueue通道中,以供workers接收。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

    if r.Method != "POST" {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

    // Read the body into a string for json decoding
	var content = &PayloadCollection{}
	err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
		w.Header().Set("Content-Type", "application/json; charset=UTF-8")
		w.WriteHeader(http.StatusBadRequest)
		return
	}

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {

        // let's create a job with the payload
        work := Job{Payload: payload}

        // Push the work onto the queue.
        JobQueue <- work
    }

    w.WriteHeader(http.StatusOK)
}

During our web server initialization we create a Dispatcher and call Run() to create the pool of workers and to start listening for jobs that would appear in the JobQueue.

在web服务初始换时我们创建的一个调度器并且调用Run()去创建工作池并且开始监听JobQueue中可能出现的job

dispatcher := NewDispatcher(MaxWorker) 
dispatcher.Run()

Below is the code for our dispatcher implementation:

以下是我们的调度器的实现代码:

type Dispatcher struct {
	// A pool of workers channels that are registered with the dispatcher
	WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
    // starting n number of workers
	for i := 0; i < d.maxWorkers; i++ {
		worker := NewWorker(d.pool)
		worker.Start()
	}

	go d.dispatch()
}

func (d *Dispatcher) dispatch() {
	for {
		select {
		case job := <-JobQueue:
			// a job request has been received
			go func(job Job) {
				// try to obtain a worker job channel that is available.
				// this will block until a worker is idle
				jobChannel := <-d.WorkerPool

				// dispatch the job to the worker job channel
				jobChannel <- job
			}(job)
		}
	}
}

Note that we provide the number of maximum workers to be instantiated and be added to our pool of workers. Since we have utilized Amazon Elasticbeanstalk for this project with a dockerized Go environment, and we always try to follow the 12-factor methodology to configure our systems in production, we read these values from environment variables. That way we could control how many workers and the maximum size of the Job Queue, so we can quickly tweak these values without requiring re-deployment of the cluster.

请注意,我们提供了要实例化并添加到我们的工作人员池中的最大工作人员数。由于我们已在Docker化的Go环境中将Amazon Elasticbeanstalk用于此项目,并且我们始终尝试遵循12要素方法来配置生产环境中的系统,因此我们从环境变量中读取这些值。这样,我们可以控制多少个工作人员和作业队列的最大大小,因此我们可以快速调整这些值,而无需重新部署群集

var ( 
  MaxWorker = os.Getenv("MAX_WORKERS") 
  MaxQueue  = os.Getenv("MAX_QUEUE") 
)

Immediately after we have deployed it we saw all of our latency rates drop to insignificant numbers and our ability to handle requests surged drastically.

部署之后,我们立即发现所有延迟率都下降到了微不足道的水平,并且处理请求的能力急剧增加。

undefined

Minutes after our Elastic Load Balancers were fully warmed up, we saw our ElasticBeanstalk application serving close to 1 million requests per minute. We usually have a few hours during the morning hours in which our traffic spikes over to more than a million per minute.

在弹性负载均衡器完全热身后的几分钟,我们看到我们的ElasticBeanstalk应用程序每分钟可处理近一百万个请求。通常,我们早上有几个小时的流量高峰,每分钟超过一百万.

As soon as we have deployed the new code, the number of servers dropped considerably from 100 servers to about 20 servers.

一旦我们部署了新代码,服务器的数量就从100台服务器大幅下降到大约20台服务器。

undefined

After we had properly configured our cluster and the auto-scaling settings, we were able to lower it even more to only 4x EC2 c4.Large instances and the Elastic Auto-Scaling set to spawn a new instance if CPU goes above 90% for 5 minutes straight.

正确配置集群和自动缩放设置后,我们甚至可以将其降低到仅4倍EC2 c4.Large实例,并且如果CPU连续5分钟超过90%,Elastic Auto-Scaling设置为产生新实例

undefined

Conclusion

Simplicity always wins in my book. We could have designed a complex system with many queues, background workers, complex deployments, but instead we decided to leverage the power of Elasticbeanstalk auto-scaling and the efficiency and simple approach to concurrency that Golang provides us out of the box.

在我的认知中,简单总是赢家。我们本来可以设计一个包含许多队列,后台工作人员,复杂部署的复杂系统,但是我们决定利用Elasticbeanstalk自动扩展的功能以及Golang提供给我们的高效,简单的并发方法。

It’s not everyday that you have a cluster of only 4 machines, that are probably much less powerful than my current MacBook Pro, handling POST requests writing to an Amazon S3 bucket 1 million times every minute.

并非每天都仅有四台机器组成一个集群,它们的功能可能不如我目前的MacBook Pro强大得多,每分钟处理POST请求写入Amazon S3存储桶的次数为100万次。

There is always the right tool for the job. For sometimes when your Ruby on Rails system needs a very powerful web handler, think a little outside of the ruby eco-system for simpler yet more powerful alternative solutions.

总有适合这项工作的工具。有时,当您的Ruby on Rails系统需要功能非常强大的Web处理程序时,请在ruby生态系统之外考虑一下,以获取更简单但功能更强大的替代解决方案。