Asynchronous Batch Logging without Blocking Threads
Recently, I had a student of my Threading class present me with a problem which many people have. He wanted his server application to batch a set of log entries into memory and to flush the batched entries out persistent storage periodically or after some number of entries were captured in the batch. This is an extremely difficult problem because there are many race conditions to consider. At any given time, any or all of the following could be happening simultaneously:
The batch could hit the desired threshold forcing it to be flushed to the log
The timer could fire forcing whatever entries are in the batch to be flushed to the log. If the timer fires and there no batched entries, then no flush should occur and we just reset the timer.
While a batch is being flushed, new log entries could be added to a new batch
I thought this was such a common and interesting problem, that I decided to try my hand at it to see what I could come up with. When architecting scalable software, you always want to strive for solutions that don't block threads since they're such expensive resources. And so, I set out on a mission to solve all these race conditions without blocking any threads. This means that most thread synchronization locks are out of the question since they all have the potential for blocking threads. However, a SpinLock can be used since it doesn't block threads. However, a SpinLock should only be used around code that is guaranteed to execute for a very short period of time so that threads don't waste too much CPU time spinning. Below is my C# solution to this problem:
using System;
using System.Threading;
using Batch = System.Collections.Generic.List<System.String>;
public sealed class Log {
private static readonly TimeSpan Infinite = TimeSpan.FromMilliseconds(Timeout.Infinite);
private readonly SpinLock m_lock = new SpinLock(false);
private readonly TimeSpan m_maxInterval;
private readonly Int32 m_maxCount;
private Batch m_batch = null;
private Timer m_timer = null;
public Log(Int32 maxCount, TimeSpan maxInterval) {
m_maxCount = maxCount;
m_maxInterval = maxInterval;
CreateNewBatch();
}
// Returns reference to old batch so it can be flushed to the log
private Batch CreateNewBatch() {
// NOTE: This method MUST be called under the SpinLock unless called by the ctor
var oldbatch = m_batch;
m_batch = new Batch();
if (m_timer != null) m_timer.Dispose();
m_timer = new Timer(TimeExpired, m_batch, m_maxInterval, Infinite);
return oldbatch;
}
private void TimeExpired(Object timersBatch) {
Console.Write("Timeout: ");
Boolean taken = false;
m_lock.Enter(ref taken);
Batch oldBatch = null;
if (timersBatch != m_batch) {
// This timer is not for the current batch; do nothing
// Solves race condition where time fires AS batch is changing
Console.WriteLine("Not for current batch");
} else {
if (m_batch.Count == 0) {
// No items in the batch, reset timer & use the same batch
m_timer.Change(m_maxInterval, Infinite);
Console.WriteLine("No items, resetting interval");
} else {
// Items in the batch, swap to a new batch
oldBatch = CreateNewBatch();
Console.WriteLine("Forcing flush, creating new batch");
}
}
m_lock.Exit();
// If there was an old batch, transfer it
TransferBatch(oldBatch);
}
public void Add(String msg) {
Boolean taken = false;
m_lock.Enter(ref taken);
m_batch.Add(msg);
Batch oldBatch = null;
Console.WriteLine("Adding msg: Current count={0}", m_batch.Count);
if (m_batch.Count == m_maxCount) oldBatch = CreateNewBatch();
m_lock.Exit();
// If batch swapped, transfer it
TransferBatch(oldBatch);
}
// TransferBatch is static to make it clear that this method is
// NOT attached to the object in any way; this is an independent operation
// that does not rely on any object state or a lock
private static void TransferBatch(Batch batch) {
// NOTE: this should be called while NOT under a lock
if (batch == null) return; // No batch to transfer, return
// Start transfer of batch (I would do this asynchronously)...
Console.WriteLine("Transferring {0} messages.", batch.Count);
// Call a BeginXxx or XxxAsync method here to transfer the batch to the persistent store...
}
}
class Program {
static void Main() {
// Some simple test code:
var log = new Log(2, TimeSpan.FromSeconds(30));
Console.ReadLine();
for (Int32 msg = 0; msg < 500; msg++) {
log.Add("Msg " + msg);
Console.ReadLine();
}
}
}
In the code above, there is a potential problem depending on what you might use for your persistent storage: The TransferBatch method initiates the transfer by calling your desired asynchronous I/O operation (BeginXxx or XxxAsync method). But, TrasnferBatch does not wait for the transfer to complete. This means that another batch could fill and also call TransferBatch introducing yet another race condition where the 2nd batch gets written to persistent storage before the 1st batch. Now, in my student's case, the log entries were being written to a Windows Azure Storage Table and the partition key contains the time the messages were written to the batch which means that the Windows Azure Table will keep the messages in chronological order even if this race condition occurs. So, the code above works fine in this scenario.
However, if you are not using a storage system that automatically maintains message order, then you will have to modify the TransferBatch method above. At first, you might think you could call TransferBatch while under the SpinLock to maintain order. But, this is not guaranteed to work as the I/O operation is issued asynchronously which means that multiple I/O operations could be sent to the Windows device driver and a device driver is allowed to execute I/O operations in any order it chooses; so FIFO is not guaranteed. You could solve the problem by issuing the transfer synchronously while holding the SpinLock but then the SpinLock is not guaranteed to be held for a short period of time and this could cause excessive CPU spinning. Or, you could replace the SpinLock with a blocking lock but then your threads might block causes additional thread pool threads to be created reducing the scalability of your application.
What we need is a way to ensure that batch transfers execute one at a time in sequential order without blocking any threads. To solve this problem, I'd recommend the use of my ReaderWriterGate class which is in my Power Threading Library. I describe how the class works in this MSDN article. I'd create an instance of the ReaderWriterGate and then queue of methods to it which will write the batch to storage. The ReaderWriterGate maintains FIFO ordering and it doesn't block any threads as it uses a SpinLock internally too.
For me, this was an extremely challenging and fun problem to solve. I know a lot of people have a very similar problem and I hope that some of you will find my code above useful. I also think it is valuable to consider the code as a model for how many other problems can be solved without blocking threads thereby increasing your application's scalability.
Jeffrey Richter's Blog
- Jeffrey Richter's profile
- 33 followers
