Blocking queues are essential to implementing producer-consumer pattern in concurrency programming. The pattern is simple. A queue of work items is maintained. For example, the queue can be a list of image URLs that need to be downloaded. A producer adds work items to the queue. A consumer removes work items from the queue. Producers and consumers work independently from different threads. If the queue is empty, the consumer blocks while it tries to get an item from the queue. As soon as the producer posts an item, the consumer ends the wait and retrieves the item.
The work queue is normally FIFO – an item that is queued earlier gets consumed earlier. Although, more complex implementation can build in a notion of priority.
There are two types of producer-consumer problem – bounded and unbounded.
- Unbounded – The work queue size is unlimited. A producer can post as many work items as it wants.
- Bounded – The work queue size is limited. If a producer produces at a more rapid rate than the consumer can consume then the queue will get filled up. When a queue is full, the producer will block as it tries to put a new item in the queue. As soon as the consumer takes an item from the queue, the producer will succeed in adding the item.
In the bounded situation both producer and consumer may block, depending on the rate they complete their work. In the unbounded situation only the consumer can block.
The good thing about the bounded situation is that the producer can be throttled if the consumer is unable to catch up with the work load. This prevents unnecessary production and wastage of CPU. Let’s say that a slideshow program has 500 images in the master list. Every image is displayed for 5 seconds. Here, the consumption rate is very slow – one image per 5 seconds. It doesn’t make a lot of sense to download all 500 images in rapid succession. We can limit the queue size to say 10 images. That way, the producer will only download an image if the work queue has room in it. This saves network communication and CPU usage.
In iOS, we generally recommend using Grand Central Dispatch (GCD) for producer consumer. With GCD, you don’t queue data, like URLs in our example. Instead, you queue code blocks:
dispatch_async(queue, ^{ //Show an image //... });
The queue is unbounded and the producer is not throttled. Instead, GCD throttles the consumer. It will dispatch the work to a thread only if it thinks that the device is sufficiently idle.
While GCD’s implementation is simple, I do not like the fact that the producer is not throttled. In some situations, a bounded queue is preferred.
Over the next few articles, I will show how to implement blocking queues – both bounded and unbounded. Consider using them only if GCD is not an option. Generally speaking, for unbounded queue, GCD works just fine. But, I will show an implementation in any case.
Unbounded Blocking Queue
In this article, I will show how to impalement unbounded blocking queue. Once again, GCD has a good alternative solution for you. Consider using it first. I am posting this solution for a couple of reasons:
- From an academic point of view, this example will help you understand pthread mutex and condition locking better.
- The implementation closely mimics Java’s LinkedBlockingQueue. If you are porting an app from Android, this will come handy.
- To set a stage for the bounded blocking queue, where, GCD doesn’t do that good a job. We will get to that in a follow up article.
Without any further ado, let’s get started.
Firstly, for an unlimited FIFO list, we will need a linked list. Regular array will be too slow. Unfortunately, Objective-C collection classes do not include a linked list. We will need to implement the functionality ourselves. I will keep things simple and let the blocking queue class manage the linked list. But, we will need a Node class that represents an work item.
Node.h
#import @interface Node : NSObject @property (nonatomic, strong) id data; @property (nonatomic, strong) Node* next; @end
Node.m
#import "Node.h" @implementation Node @synthesize data; @synthesize next; @end
Now, we will see the UnboundedBlockingQueue class. This is equivalent to LinkedBlockingQueue of Java.
UnboundedBlockingQueue.h
#import #import #import "Node.h" @interface UnboundedBlockingQueue : NSObject{ @private pthread_mutex_t lock; pthread_cond_t notEmpty; Node *first, *last; } - (UnboundedBlockingQueue*) init; - (void) put: (id) data; - (id) take: (int) timeout; //timeout is in seconds @end
UnboundedBlockingQueue.m
#import "UnboundedBlockingQueue.h" @implementation UnboundedBlockingQueue - (UnboundedBlockingQueue*) init { if ((self = [super init])) { last = nil; first = nil; pthread_mutex_init(&lock, NULL); pthread_cond_init(& notEmpty, NULL); } return self; } - (void) dealloc { pthread_mutex_destroy(&lock); pthread_cond_destroy(& notEmpty); } - (void) put: (id) data { pthread_mutex_lock(&lock); Node *n = [[Node alloc] init]; n.data = data; if (last != nil) { last.next = n; } if (first == nil) { first = n; } last = n; pthread_cond_signal(& notEmpty); pthread_mutex_unlock(&lock); } - (id) take: (int) timeout { id data = nil; struct timespec ts; struct timeval now; pthread_mutex_lock(&lock); gettimeofday(&now, NULL); ts.tv_sec = now.tv_sec + timeout; ts.tv_nsec = 0; while (first == nil) { if (pthread_cond_timedwait(& notEmpty, &lock, &ts) == ETIMEDOUT) { pthread_mutex_unlock(&lock); return nil; } } data = first.data; first = first.next; if (first == nil) { last = nil; //Empty queue } pthread_mutex_unlock(&lock); return data; } @end
If you are new to pthreads, focus on the pthread_cond_timedwait call. It does three things:
- Releases the lock on the mutex.
- Starts to wait for notification (signal) on the condition.
- Once the signal arrives, waits to reacquire the lock on the mutex.
For all practical purposes, all of the take: and put: methods run within an exclusive lock of the mutex. However, while pthread_cond_timedwait() is waiting, the lock is relinquished. It is essential that it does so. This allows the producer to call the put: method and put an item in the queue.
Finally, we should only call pthread_cond_timedwait() if there is a reason to do so – that is the queue is empty. This process is tricky due to spurious wake. In short, in multiprocessor nvironments, pthread_cond_timedwait/pthread_cond_wait can return even when no thread has signaled the condition. To deal with that, we employ this logic:
- If pthread_cond_timedwait times out, we return nil. This will only happen if the queue is still empty.
- If pthread_cond_timedwait returns for any other reason, we go back and check if the queue is empty (first == nil). In case of spurious wake, the queue can be still empty. This is why, the wait calls are wrapped in a while block and not an if block.
Finally, we see example uses of the class.
- (void) startProcess { //Start producer thread [NSThread detachNewThreadSelector:@selector(doPut) toTarget:self withObject:nil]; //Start consumer thread [NSThread detachNewThreadSelector:@selector(doGet) toTarget:self withObject:nil]; } //Producer - (void) doPut { int i; for (i = 0; i < 5; ++i) { @autoreleasepool { NSString *str = [NSString stringWithFormat: @"Data %d", i]; NSLog(@"Putting %@", str); [queue put: str]; } } } //Consumer - (void) doGet { while (TRUE) { @autoreleasepool { NSString *str = [queue take: 10]; if (str == nil) { NSLog(@"Queue is empty"); } else { [NSThread sleepForTimeInterval: 1]; NSLog(@"Got %@", str); } } } }