We’ve built Shelby with a distributed architecture (the subject of another post) with a highly parallelized social stream processing infrastructure (another post as well). To accomplish this we’re using the Beanstalk message queue (not using Amazon SQS as we’re not running on EC2) to allow the disparate components of the system to communicate.
Beanstalk handles publishers and consumers via sockets and has had no trouble taking a pounding from actual multithreaded usage (i.e. multi-core CPU coupled with language and OS support). Unfortunately, I couldn’t find a Ruby client designed for a truly multithreaded environment. The best and most popular client I found is beanstalk-client. While that client is thread safe, the implementation will hang if used naively. Indeed, there is one very important aspect of Beanstalk itself that should be made perfectly clear: you must reserve and delete a job using the same connection. You cannot call reserve() on a client before somehow releasing the job previously returned by that client. This can get messy when multithreaded, so I forked that client…
The recently open-sourced OvertimeMedia/beanstalk-client-ruby includes a Beanstalk::ThreadedPool class which creates a managed set of Beanstalk clients. A proper Thread would reserve a Beanstalk client with ThreadedPool.reserve_connections() (which will block if no connections are currently available) then call reserve() on that client. This connection will not be released back to the pool of available connections until it deletes/releases/buries the job it reserved from Beanstalk. This is conveniently wrapped into the job itself.
For example, if you have a threaded work queue and want to process up to 100 Beanstalk jobs at a time…
@beanstalk_pool = Beanstalk::ThreadedPool.new('localhost', 100, 'tubename')
while(true)
job = @beanstalk_pool.reserve_connection.reserve
@work_queue.process_in_new_thread(job) { |job| work_on(job); job.delete(); }
end
I’ve not implemented the full Beanstalk API with my multithreaded fork as I’m not using the full API. I selfishly only built out the parts I’m using. But this fork has been running the Shelby alpha for two weeks and processed millions of jobs without issue. Please let me know what you think of the client if you use it. I’d love to improve it and help any others who are using it.
p.s. It’s tough to take time out to write up these posts when you could be writing some bad ass multithreaded code ;-]





