A Threadpool for the Command-Line

Posted on January 7, 2011 by Brian Jaress
Tags: code, haskell

Introduction

A while ago I wrote a small program in Haskell to see what I thought of the language, added my thoughts to the code, and posted it. Experienced haskellers soon posted critiques and improved versions.

I decided to incorporate those ideas into a second version of the program. Only one change had a significant effect on the program’s behavior, but the code was much cleaner.

This is the third version of the program. I was inspired to create a third version by hearing about GNU Parallel, a very feature-rich alternative. I discovered, at the same time, that xargs can also launch processes in parallel. So I decided to focus my rather redundant program on simplicity and ease of use. The main improvement in the third version is real command-line options and a nice help screen.

What the Program Does

This program is a command line thread pool. You give it shell commands and it runs them, with a fixed number1 running at any one time, while the rest are either finished or waiting to start.

You might, for example, have fifty files to convert using a program that converts a single file each time you run it and want to process three at a time.2

How to Use the Program

The name of the executable is threadpool. It takes an optional argument which is the number of simultaneous threads (the default is three). Give it the commands to run, one per line, through standard input. You may use blank lines (or a separator you specify) to divide the commands into sections. The commands in a section will not be started until all the commands in previous sections are complete.3

You should compile the program using GHC and the -threaded option.4

The Code

Since this is a small program, most of the code deals with handling input. There is one modestly-sized function that serves as a thread pool, and then a significantly larger amount of code for interpreting the command-line options from the user.

Libraries

This program incorporates two key library suggestions from Haskell users.

The first is to use a quantity semaphore from the standard libraries. The second was to use functions for working with multiple monad values from Control.Monad.

import Control.Concurrent (forkIO)
import Control.Concurrent.QSem
import Control.Exception (finally)
import Control.Monad (mapM_, replicateM_)
import Data.List (unfoldr)
import Data.Maybe (mapMaybe)
import System.Environment (getArgs)
import System.Process (runCommand, waitForProcess)
import System.Console.GetOpt

The Thread Pool

The internal thread pool itself doesn’t see the processes it runs—it carries out arbitrary tasks. This version uses a quantity semaphore, per a suggestion on reddit.

After the I wrote the first version of this program, Thomas DuBuisson posted an internal thread pool library on HackageDB. I considered using it, but it’s not quite what I needed, despite being very full-featured.

An even more recent library by Max Bolingbroke has a function with the same type as the one here. I might use that library in future versions.

threadPool :: Int -> [IO ()] -> IO ()
threadPool threadCountRequested tasks = do
    semaphore <- newQSem threadCountActual
    mapM_ (delegate semaphore) tasks
    collect threadCountActual semaphore
        where
            delegate semaphore task = do
                waitQSem semaphore
                forkIO (task `finally` signalQSem semaphore)
            threadCountActual = max threadCountRequested 1
            collect count = replicateM_ count . waitQSem

A quantity semaphore, if you’re like me and haven’t used one before, represents a fixed amount of a resource. You can think of it like a library with a certain number of copies of the same book. If the library owns five copies and three are checked out, you can borrow one of the other two. If all five are out, you have to wait. That captures an essential property of the thread pool, and when the haskellers posted their own solutions there was a definite difference in size and complexity between solutions that used a QSem and those that didn’t.

The threadPool function creates a quantity semaphore and uses it to control the number of threads working on tasks at the same time. The main thread borrows a unit of the semaphore before giving a task to a worker thread, and the worker returns the unit when the task is done. That way, the main thread waits to give out a new task if the number of pending (assigned but incomplete) tasks equals the size of the quantity semaphore.

Once all tasks are assigned, the main thread waits on the entire quantity semaphore to make sure all the workers are done before it exits the program.

Threads are not reused, so threadPool is not technically a thread pool. Since it still looks like one from the outside, the name remains.

Handling Input

Command Line Arguments

The handling of command-line arguments is heavily inspired by the standard library documentation and a baby-steps introduction by Leif Frenzel.

A custom data record communicates the information gathered from the command-line to the rest of the program.

data RunTimeConfig = RunTimeConfig {
    sectionThreadCounts :: [Int],
    sectionSeparator :: String,
    showHelp :: Bool
    }
defaultConfig = RunTimeConfig [] "" False

To make the record value correspond to the command-line arguments, and to set up the help screen, there is an association between command-line flags and record-updating functions. The getOpt function from System.Console.GetOpt takes that association and converts the list of command-line arguments into a list of record-updating functions.

getOpt is actually a general way of associating flags with anything and converting a list of arguments to a list whatever you associated the flags with.5 When I learned about it I thought, “Is that really all you need? Yeah, I guess so.”

helpHeader = "Usage: threadpool [OPTION...]          \n\
\                                                    \n\
\    Pass commands to execute on stdin, and put      \n\
\    section separators on their own line.           \n\
\                                                    \n\
\    Multiple thread counts will apply separately    \n\
\    to each section in order.  If the number of     \n\
\    thread counts is less than the number of        \n\
\    sections, the final thread count is reused      \n\
\    for the extra sections.                         \n\
\ "

commandLineOptions ::
    [OptDescr (RunTimeConfig -> RunTimeConfig)]
commandLineOptions = [
    Option ['h'] ["help"] (NoArg yesShowHelp) "Show help"
    ,
    Option ['n'] ["threadCount"]
        (ReqArg readCount "COUNT") "Number of threads"
    ,
    Option ['s'] ["separator"]
        (ReqArg (\ s conf -> conf {sectionSeparator = s })
            "SEPARATOR")
        "Input section separator line"
    ]
    where
        fullRead :: Read a => String -> Maybe a
        fullRead input = case reads input of
            [(val, "")] -> Just val
            _           -> Nothing
        readCount = maybe
            yesShowHelp
            (\ n conf -> conf {sectionThreadCounts =
                sectionThreadCounts conf ++ [n]})
            . fullRead
        yesShowHelp conf = conf {showHelp = True}

Error handling is pretty basic: If the program sees anything it can’t handle on the command-line, it halts with the help screen as part of the error message. There’s little bit of inconsistency in that some errors are handled by halting the program, while others are handled by setting showHelp to True.

In the happy case where there are no errors, getOpt converts the list of arguments into a list of update functions, and we apply them all to our record with a fold. If you’re new to Haskell like I am, you might have some fun boggling at flip id (which I got from the standard library documentation) but it does exactly what you’d expect in a fold where the list is really a list of update functions.

configFromCommandLine :: [String] -> IO RunTimeConfig
configFromCommandLine argv = return $ rawConfig argv
    where
        rawConfig argv = case
            getOpt Permute commandLineOptions argv of
                (opts, [], []) -> summarize opts
                (_, notRecognized, []) -> error
                    $ "unrecognized arguments: "
                    ++ unwords notRecognized
                    ++ usageInfo helpHeader
                       commandLineOptions
                (_, _, errorMessages) -> error
                    $ concat errorMessages
                    ++ usageInfo helpHeader
                       commandLineOptions
        summarize opts = foldl (flip id) defaultConfig opts

The help screen told the user that if the number of sections is greater than the number of thread counts, the final thread count would be reused for the extra sections. Essentially, the finite list of counts becomes an infinite list by repeating the last item. Since it’s possible for the user to specify no counts on the command line, an empty list becomes an infinite list of the default count.

I can’t shake the feeling that there must be a standard library function for this, but I can’t find it so I’ve written my own.

extended :: a -> [a] -> [a]
extended defaultValue = unfoldr $ next defaultValue
    where
        next def [] = Just (def, [])
        next _ [x] = Just (x, [x])
        next _ (x:xs) = Just (x, xs)

Sectioned Input

The user can divide the list of commands into sections, using separator lines. Before any commands in a new section start, all commands from previous sections will finish. You can achieve the same thing by running the threadpool program multiple times in sequence, but sections are likely to be easier if you are generating the list of commands with another program.

The sectioning code ignores leading and trailing separators and collapses multiple contiguous separators into one, similar to blank lines in LaTeX. I couldn’t find a library function that did exactly what I needed, but several were close enough to be stitched together into my own one-off function.

sections :: (a -> Bool) -> [a] -> [[a]]
sections = unfoldr . splitFirst
    where
        splitFirst isSep raw =
            case break isSep (dropWhile isSep raw) of
                ([], _) ->  Nothing
                parts   ->  Just parts

Once the commands are broken into sections (and the separator lines are discarded) the commands can be converted from string commands into things we can actually execute and placed into threadpools.

executeAllSections config commandSections =
    mapM_ executeSection $ zip (extendedThreadCounts config)
                               commandSections
        where
            executeSection (threadCount, cmds) =
                threadPool threadCount (map execute cmds)
            execute cmd = do
                handle <- runCommand cmd
                waitForProcess handle
                return ()
            extendedThreadCounts =
                extended 3 . sectionThreadCounts

Tying Input to Thread Pools

With all that in place, the main function is fairly straightforward—It just connects the pieces defined above.

main = do
    config <- getArgs >>= configFromCommandLine
    input <- getContents
    if showHelp config
        then putStr $ usageInfo helpHeader commandLineOptions
        else executeAllSections config $
                                sectionedInput config input
    where
        sectionedInput config =
            sections (== sectionSeparator config) . lines

  1. There’s an exception at the end, where there will likely be fewer than that many commands left to run.↩︎

  2. Thread pools usually show up as part of a larger program, where they are used to avoid the overhead of creating and destroying threads and to peg the number of simultaneous threads at something not too large or too small. The first purpose doesn’t really apply here because the tasks themselves have the same type of overhead associated with a thread, and more of it. The second purpose, however, does apply because doing all the tasks at once can take longer and use more resources than doing a few at a time.↩︎

  3. You can also give each section a different number of threads.↩︎

  4. I developed and tested the latest version with GHC version 6.12.1.↩︎

  5. Plus another list of anything left over.↩︎