This chapter includes:
Let's look at our multithreaded resource manager example in more detail:
#include <errno.h> #include <stdio.h> #include <stddef.h> #include <stdlib.h> #include <unistd.h> /* * define THREAD_POOL_PARAM_T such that we can avoid a compiler * warning when we use the dispatch_*() functions below */ #define THREAD_POOL_PARAM_T dispatch_context_t #include <sys/iofunc.h> #include <sys/dispatch.h> static resmgr_connect_funcs_t connect_funcs; static resmgr_io_funcs_t io_funcs; static iofunc_attr_t attr; main(int argc, char **argv) { /* declare variables we'll be using */ thread_pool_attr_t pool_attr; resmgr_attr_t resmgr_attr; dispatch_t *dpp; thread_pool_t *tpp; dispatch_context_t *ctp; int id; /* initialize dispatch interface */ if((dpp = dispatch_create()) == NULL) { fprintf(stderr, "%s: Unable to allocate dispatch handle.\n", argv[0]); return EXIT_FAILURE; } /* initialize resource manager attributes */ memset(&resmgr_attr, 0, sizeof resmgr_attr); resmgr_attr.nparts_max = 1; resmgr_attr.msg_max_size = 2048; /* initialize functions for handling messages */ iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs, _RESMGR_IO_NFUNCS, &io_funcs); /* initialize attribute structure used by the device */ iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0); /* attach our device name */ id = resmgr_attach( dpp, /* dispatch handle */ &resmgr_attr, /* resource manager attrs */ "/dev/sample", /* device name */ _FTYPE_ANY, /* open type */ 0, /* flags */ &connect_funcs, /* connect routines */ &io_funcs, /* I/O routines */ &attr); /* handle */ if(id == -1) { fprintf(stderr, "%s: Unable to attach name.\n", argv[0]); return EXIT_FAILURE; } /* initialize thread pool attributes */ memset(&pool_attr, 0, sizeof pool_attr); pool_attr.handle = dpp; pool_attr.context_alloc = dispatch_context_alloc; pool_attr.block_func = dispatch_block; pool_attr.unblock_func = dispatch_unblock; pool_attr.handler_func = dispatch_handler; pool_attr.context_free = dispatch_context_free; pool_attr.lo_water = 2; pool_attr.hi_water = 4; pool_attr.increment = 1; pool_attr.maximum = 50; /* allocate a thread pool handle */ if((tpp = thread_pool_create(&pool_attr, POOL_FLAG_EXIT_SELF)) == NULL) { fprintf(stderr, "%s: Unable to initialize thread pool.\n", argv[0]); return EXIT_FAILURE; } /* start the threads, will not return */ thread_pool_start(tpp); }
The thread pool attribute (pool_attr) controls various aspects of the thread pool, such as which functions get called when a new thread is started or dies, the total number of worker threads, the minimum number, and so on.
Here's the _thread_pool_attr structure:
typedef struct _thread_pool_attr { THREAD_POOL_HANDLE_T *handle; THREAD_POOL_PARAM_T *(*block_func)(THREAD_POOL_PARAM_T *ctp); void (*unblock_func)(THREAD_POOL_PARAM_T *ctp); int (*handler_func)(THREAD_POOL_PARAM_T *ctp); THREAD_POOL_PARAM_T *(*context_alloc)( THREAD_POOL_HANDLE_T *handle); void (*context_free)(THREAD_POOL_PARAM_T *ctp); pthread_attr_t *attr; unsigned short lo_water; unsigned short increment; unsigned short hi_water; unsigned short maximum; unsigned reserved[8]; } thread_pool_attr_t;
The functions that you fill into the above structure can be taken from the dispatch layer (dispatch_block(), ...), the resmgr layer (resmgr_block(), ...) or they can be of your own making. If you're not using the resmgr layer functions, then you'll have to define THREAD_POOL_PARAM_T to some sort of context structure for the library to pass between the various functions. By default, it's defined as a resmgr_context_t but since this sample is using the dispatch layer, we needed it to be adispatch_context_t. We defined it prior to doing the includes above since the header files refer to it. THREAD_POOL_PARAM_T
Part of the above structure contains information telling the resource manager library how you want it to handle multiple threads (if at all). During development, you should design your resource manager with multiple threads in mind. But during testing, you'll most likely have only one thread running (to simplify debugging). Later, after you've ensured that the base functionality of your resource manager is stable, you may wish to “turn on” multiple threads and revisit the debug cycle.
The following members control the number of threads that are running:
The important parameters specify the maximum thread count and the increment. The value for maximum should ensure that there's always a thread in a RECEIVE-blocked state. If you're at the number of maximum threads, then your clients will block until a free thread is ready to receive data. The value you specify for increment will cut down on the number of times your driver needs to create threads. It's probably wise to err on the side of creating more threads and leaving them around rather than have them being created/destroyed all the time.
You determine the number of threads you want to be RECEIVE-blocked on the MsgReceive() at any time by filling in the lo_water parameter.
If you ever have fewer than lo_water threads RECEIVE-blocked, the increment parameter specifies how many threads should be created at once, so that at least lo_water number of threads are once again RECEIVE-blocked.
Once the threads are done their processing, they will return to the block function. The hi_water variable specifies an upper limit to the number of threads that are RECEIVE-blocked. Once this limit is reached, the threads will destroy themselves to ensure that no more than hi_water number of threads are RECEIVE-blocked.
To prevent the number of threads from increasing without bounds, the maximum parameter limits the absolute maximum number of threads that will ever run simultaneously.
When threads are created by the resource manager library, they'll have a stack size as specified by the thread_stack_size parameter. If you want to specify stack size or priority, fill in pool_attr.attr with a proper pthread_attr_t pointer.
The thread_pool_attr_t structure contains pointers to several functions:
The library provides the following thread pool functions:
In the example provided in the multithreaded resource managers section, thread_pool_start(tpp) never returns because we set the POOL_FLAG_EXIT_SELF bit. Also, the POOL_FLAG_USE_SELF flag itself never returns, but the current thread becomes part of the thread pool. |
If no flags are passed (i.e. 0 instead of any flags), the function returns after the thread pool is created.