Multiple Wait Semaphore
Sept. 8, 2000: Changed to use the broadcase condition variable model. (See Chapter 10
in the Second Edition.)
Page 235 in the book mentioned that you could atomically wait for multiple semaphore
units by using a mutex to guard the semaphore. This is correct, but limited. I call this
the "First Come, First Served" multiple wait semantics as a large request will
block all subsequent requests (regardless of thread priority or request size) until the
initial request is satisfied. This might be the semantics you require, but, you
might actually require "First Satisfiable" semantics where a small request will
be satisfied before a large request.
This small, integrated collection of functions implements the required routines. Here
are some features:
- The CreateSemaphoreMW function has a flag to specify
"First Satisfiable" or "First Come, First Served" semantics.
- You can also name the MW semaphore so that it can be
shared between processes.
- The WaitForSingleObjectMW function has a parameter to
specify the number of required units.
- The CloseSynchHandle function plays the same role as CloseHandle. It is intended to be general purpose so that new
objects can be implemented over time.
How would you use such a capability? The First Edition Chapter 11's server showed one
example, where different threads require different amounts of a limited serving
capability. Here is another example, which is fairly realistic. Chapter 10 in the Second
Edition takes the idea of a semaphore a step further by implementing a message queueing
system.
Suppose you have a "Monitor" thread that is not to start (or get past a
certain point) until ALL worker threads are running or have reached some specified point.
Here is what to do, using a "First Satisfiable" multiple wait semaphore:
- Initialize the semaphore to its maximum count (at creation time), where the maximum
count is the number of worker threads.
- Each worker thread releases one unit when it starts.
- The monitor thread waits atomically for the maximum count. Therefore, it will not start
until all threads are running, preventing, perhaps, a race condition.
Here is an exercise involving the above idea and an extra event together with a
"First Come, First Served" semaphore. Construct a "thread rendezvous"
where all worker threads block at a certain point and they are not released until the
monitor finds that some number of them have blocked at the rendezvous. Can you construct a
rendezvous object that does not require the monitor thread?
Here is another exercise. Construct a "reverse semaphore" object that is
signaled if and only if the count is equal to the maximum value. A waiting thread that is
released decreases the semaphore by the count specified in the wait and the reverse
semaphore becomes unsignaled. You can do this with some minor modifications to the
functions here. See http://www.microsoft.com/msj/0797/win320797top.htm
for a different solution, although, in that article, the semaphore is signaled only when
it is 0, and there is no multiple wait.
The implementation has some other interesting features:
- A synchronization handle structure is defined in the header file. It requires an event
and two mutexes. You will see why all three are required. Notice that this means that it
is possible to implement a semaphore with the other two objects.
- The WaitForSingleObjectMW has at least one none-obvious
step; take a look at the code. Also, this function is efficient; there is no busy
waiting.. Again, take a look at the code to see how this is achieved. This solution is
an example of the "condition variable model" described in Chapter 10 (Second
Edition).
- The choice of pulsing a manual reset event is essential; again, take a look at the
comments.
- Interprocess synchronization requires that there be synchronization handle structures in
shared memory to maintain the object state. However, there must also be a
"shadow" structure in the process-private memory containing handles to the
mutexes and event. Notice how this works and how shared memory objects are released for
reuse after the last open handle is released.
- The test program shows a multiple producer/consumer model. When you bring it up, you are
prompted for the number of producer threads (in this test process) and the number of
consumer threads, as well as the object name. To test the interprocess synchronization,
bring up two separate processes running the same test program. Use the same name for each,
and have only producer threads in one (zero consumer threads) and only consumer threads in
the other.
COMMENTS ON TESTING, DEBUGGING, AND VALIDATING
Threaded, synchronized systems present significant challenges not only in the design
and implementation, but also in all stages of quality assurance. There are many traps for
the unwary. However, writing good code need not be that difficult, and it can be fun and
rewarding. Here are some hints as well as some personal opinions. Some of the personal
opinions are not widely shared.
- Testing is necessary, but NOT sufficient, to assure correct operation. This can
be said of any program, but it is particularly so when synchronization issues are
involved. There can be any number of subtle race conditions, deadlocks, and so on that may
never appear in testing, and, if they do, they may not be repeatable or easy to diagnose.
- The same can be said of debugging. I never used the debugger on these examples despite
the fact that Visual C++ has excellent debugging capabilities. Debugging also changes the
timing of threads and can mask the defects you are trying to find. Finally, the VC++
debugger actually changes the behavior of functions such as PulseEvent;
Microsoft has a discussion of this at http://support.microsoft.com/support/kb/articles/Q173/2/60.ASP.
- It is absolutely necessary to design and implement the system correctly.
- Furthermore, you must carefully read and analyze your synchronization code and consider
all the possibilities. For example, after every wait or release call, ask what would
happen if the thread were preempted at that instant and another thread were to execute the
same code. Furthermore, ask if all the conditions that you expect to hold at this point
can be assured to hold. For example, consider the potential race condition in Chapter 10's
sortMT program. My initial implementations of the program
here and of sortMT program had defects that never showed
up in tests but became apparent when I went through these thought experiments.
- Ultimately, you must prove to yourself (and, perhaps, to your colleagues) that
the code is correct. The proof may not be formal (although formal proofs can play a role),
but it is worthwhile going through the motions.
- In my personal experience, even the best programmers engage in extreme forms of arm
waving when discussing synchronization, and the buzz words fly furiously. Try not to
engage in such activity, and be suspicious of anyone who does.
- Haste is the biggest enemy. Take the time to assure yourself that the code is correct.
I have seen some on-line discussion that emphasizes this point. Under the debugger, PulseEvent had a different effect than without the debugger.
Apparently, when pulsing a manual reset event, the signal could be lost under certain
circumstances when using the debugger.
I plan to add some material regarding informal proofs of correctness for dealing with
synchronized threads.
Now that I have said that, I fully expect to be humbled when someone points out an
obvious (or not so obvious) defect in my code. Please take the challenge to do so. If you
succeed, I will still be grateful and will make the excuse that I was in a hurry (see the
last bullet) even though I did perform all the recommended steps. Jan 5, 2000: I just
found a bug after 18 months using this code. It is now fixed in the code below.
This is the function library.
/* SynchObj.c
Copyright (c) 1998, Johnson M. Hart
Permission is granted for any and all use providing that this copyright is
properly acknowledged.
There are no assurances of suitability for any use whatsoever.
Library of synchronization "objects" to supplement the standard Windows
events and mutexes.
For simplicity, this implementation shares a header file (synchize.h) with
another library of synchronization objects (EvMxSem.c) that is designed
to operate under Windows CE, which does not have semaphores.
Implementation notes:
1. All required internal data structures are allocated on the process's heap
2. Where appropriate, a new error code is returned (see the header
file), or, if the error is a Windows error, that code is unchanged.
3. Notice the new handle type, "SYNCHHANDLE" which has handles, counters,
and other information. This structure will grow as new objects are added
to this set; some members are specific to only one or two of the objects;
in particular, the structure is more general than is required just for
some of the specific examples.
4. Mutexes are used for critical sections. These could be replaced with
CRITICAL_SECTION objects if the objects are used totally within
a process. Even better, if the object is not named, then you
know that it will be used only within the process and you could
make the decision at create time. HOWEVER, you will lose the timeout
option if you do so, AND you will also lose the naming and interprocess
capabilities.With Windows NT4.0 and later, SignalObjectAndWait is a possibility.
5. These simulated semaphores can be named and hence can be shared
between processes.
6. No Windows semaphores are used, so this implementation (with null names)
will operate under Windows CE.
7. The implementation shows several interesting aspect of synchronization, some
of which are specific to Windows and some of which are general. There are pointed
out in the comments as appropriate.
8. These objects have a WaitForSingleObject equivalent. There is, however, no
equivalent to WaitForMultipleObjects as this is very difficult, if not impossible
to do efficiently outside of the kernel.
*/
#include "EvryThng.h"
#include "Synchize.h"
static SYNCHHANDLE CleanUp (SYNCHHANDLE);
static SYNCHHANDLE AllocSynchHandle (LPCTSTR, LPTSTR, LPTSTR, LPTSTR, LPBOOL);
/*********************************************
MULTIPLE WAIT SEMAPHORE function family.
These semaphores allow a thread to wait atomically for several "units"
rather than just one. The DEFAULT wait semantics are considered to be
"First Satisfiable". That is, a thread with a satisfiable request
will be released even if a thread (regardless of priority) has an
outstanding request that is not currently satisfiable.
Optional semantics, specified at create time, are
"First Come, First Served". A thread with request larger than the
number of units currently available will block all subsequent
threads, even those with satisfiable requests or those of
higher priority. The solution on p. 235 is a First Come, First Served
solution, but here we implement it within the general framework.
*/
SYNCHHANDLE CreateSemaphoreMW (
LPSECURITY_ATTRIBUTES lpSemaphoreAttributes, /* pointer to security attributes */
LONG lInitialCount, /* initial count */
LONG lMaximumCount, /* maximum count */
BOOL fFirstComeFirstServed, /* First Satisfiable is the default */
LPCTSTR lpName )
/* Multiple wait semaphore creation.
Requires a counter, a mutex to protect the semaphore state, and a
manual-reset event.
Here are the rules that must always hold between the manual-reset event
and the mutex (any violation of these rules by the multiple wait semaphore
functions will, in all likelihood, result in a defect):
1. No thread can set, pulse, or reset the event,
nor can it access any part of the SYNCHHANDLE structure,
without first gaining ownership of the mutex.
BUT, a thread can wait on the event without owning the mutex
(this is clearly necessary or else the event could never be set).
2. The event is in a signaled state if and only if the count has just been
changed so that it is greater than zero. To assure this property, the count should
be checked after every semaphore decrease.
3. The semaphore count is always >= 0 and <= the maximum count
*/
{
SYNCHHANDLE hSynch = NULL, hShare = NULL;
TCHAR MutexName [MAX_PATH] = _T(""), EventName [MAX_PATH] = _T(""),
MutexaName[MAX_PATH] = _T("");
BOOL NewObject;
if (lInitialCount > lMaximumCount || lMaximumCount < 0 || lInitialCount < 0) {
/* Bad parameters */
SetLastError (SYNCH_ERROR);
return NULL;
}
hSynch = AllocSynchHandle (lpName, MutexName, EventName, MutexaName, &NewObject);
if (hSynch == NULL) {
SetLastError (SYNCH_ERROR);
return NULL;
}
/* Create the object handles. These are always created in the process's
local handle. */
hSynch->hMutex = CreateMutex (lpSemaphoreAttributes, FALSE, (LPCTSTR)MutexName);
/* Create the event. It is initially signaled if and only if the
initial count is > 0 */
hSynch->hEvent = CreateEvent (lpSemaphoreAttributes, TRUE /* manual reset */,
lInitialCount > 0, (LPCTSTR)EventName);
hSynch->hMutexa = NULL;
hSynch->dwFlags = 6; /* An event and a mutex, but no secondary mutex
unless it is a First Come, First Served Multiple Wait semaphore */
if (fFirstComeFirstServed) {
hSynch->hMutexa = CreateMutex (lpSemaphoreAttributes, FALSE, (LPCTSTR)MutexaName);
hSynch->dwFlags = 7; /* All three objects were created */
}
/* Set the object state, always in the local handle (for quick reference)
and in the shared handle if there is one. */
hSynch->MaxCount = lMaximumCount;
hSynch->CurCount = lInitialCount; /* The local value is not maintained. */
hSynch->fFirstComeFirstServed = fFirstComeFirstServed;
_tcscpy (hSynch->lpName, lpName);
hShare = hSynch->SharedHandle;
if (NewObject && hShare != NULL ) {
/* There is a new shared handle. Set the state if it is new */
hShare->MaxCount = lMaximumCount;
hShare->CurCount = lInitialCount; /* The local value is not maintained. */
hShare->fFirstComeFirstServed = fFirstComeFirstServed;
_tcscpy (hShare->lpName, lpName);
}
/* Return with the handle, or, if there was any error, return
a null after closing any open handles and freeing any allocated memory */
return CleanUp (hSynch);
}
BOOL ReleaseSemaphoreMW (SYNCHHANDLE hSemMW, LONG cReleaseCount, LPLONG lpPreviousCount)
/* Multiple wait equivalent to ReleaseSemaphore */
{
BOOL Result = TRUE;
SYNCHHANDLE hState;
/* Gain access to the object to assure that the release count
would not cause the total count to exceed the maximum */
/* The state is maintained locally if the object is unnamed and
in shared memory for a named object */
hState = (hSemMW->SharedHandle == NULL) ? hSemMW : hSemMW->SharedHandle;
_try {
WaitForSingleObject (hSemMW->hMutex, INFINITE);
*lpPreviousCount = hState->CurCount;
if (hState->CurCount + cReleaseCount > hState->MaxCount || cReleaseCount <= 0) {
SetLastError (SYNCH_ERROR);
Result = FALSE;
_leave;
}
hState->CurCount += cReleaseCount;
/* Pulse the manual reset. All threads currently waiting on the
event will be released, and then the event will be reset. */
PulseEvent (hSemMW->hEvent);
}
_finally {
ReleaseMutex (hSemMW->hMutex);
return Result;
}
}
DWORD WaitForSemaphoreMW (SYNCHHANDLE hSemMW, LONG cSemRequest, DWORD dwMilliseconds)
/* Multiple wait semaphore equivalent of WaitForSingleObject.
The second parameter is the number of units requested, and the waiting will be
either first come, first served or first available, depending on the option
selected at create time. */
{
DWORD WaitResult, TimeOut = min (dwMilliseconds, CV_TIMEOUT);
/* cv_TIMEOUT is required for the broadcase condition variables model.
See Chapter 10 in the Second Edition. The nominal value is 50. */
SYNCHHANDLE hState;
/* The state is maintained locally if the object is unnamed and
in shared memory for a named object */
hState = (hSemMW->SharedHandle == NULL) ? hSemMW : hSemMW->SharedHandle;
if (cSemRequest <= 0 || cSemRequest > hState->MaxCount) {
SetLastError (SYNCH_ERROR);
return WAIT_FAILED;
}
/* If this is a First Come, First Served MW semaphore, then this thread
seizes the secondary mutex to block all other threads.
Do this BEFORE waiting on the mutex protecting the semaphore state. */
if (hSemMW->fFirstComeFirstServed) {
WaitResult = WaitForSingleObject (hSemMW->hMutexa, dwMilliseconds);
if (WaitResult != WAIT_OBJECT_0 && WaitResult != WAIT_ABANDONED_0) return WaitResult;
}
WaitResult = WaitForSingleObject (hSemMW->hMutex, dwMilliseconds);
if (WaitResult != WAIT_OBJECT_0 && WaitResult != WAIT_ABANDONED_0) return WaitResult;
while (hState->CurCount < cSemRequest) {
/* The count is less than the number requested.
The thread must wait on the event (which, by the rules, is currently reset)
for semaphore resources to become available. First, of course, the mutex
must be released so that another thread will be capable of setting the event.
*/
ReleaseMutex (hSemMW->hMutex);
/* Wait for the event, indicating that some other thread has increased
the semaphore count.
The event is autoreset and signaled with a SetEvent (not PulseEvent)
so only those threads currently waiting will be released.
*/
WaitResult = WaitForSingleObject (hSemMW->hEvent, dwMilliseconds);
if (WaitResult != WAIT_OBJECT_0) return WaitResult; /* Add logic: go back and try the while again! */
/* Seize the semaphore so that the semaphore state can be retested
at the top of the loop. Note that there may be other threads
waiting at this same point, but only one at a time can test the
semaphore count. */
WaitResult = WaitForSingleObject (hSemMW->hMutex, dwMilliseconds);
if (WaitResult != WAIT_OBJECT_0 && WaitResult != WAIT_ABANDONED_0) return WaitResult;
}
/* hState->CurCount < cSemRequest (the request can be satisfied), and
this thread owns the semaphore */
hState->CurCount -= cSemRequest;
if (hState->CurCount) SetEvent (hSemMW->hEvent);
ReleaseMutex (hSemMW->hMutex);
if (hSemMW->fFirstComeFirstServed) ReleaseMutex (hSemMW->hMutexa);
return WaitResult;
}
BOOL CloseSynchHandle (SYNCHHANDLE hSynch)
/* Close a synchronization handle.
Improvement: Test for a valid handle before closing the handle */
{
BOOL Result = TRUE;
if (hSynch->hEvent != NULL) Result = Result && CloseHandle (hSynch->hEvent);
if (hSynch->hMutex != NULL) Result = Result && CloseHandle (hSynch->hMutex);
if (hSynch->hMutexa != NULL) Result = Result && CloseHandle (hSynch->hMutexa);
if (hSynch->SharedHandle != NULL)
InterlockedDecrement (&(hSynch->SharedHandle->RefCount));
if (hSynch->ViewOfFile != NULL) UnmapViewOfFile (hSynch->ViewOfFile);
HeapFree (GetProcessHeap (), 0, hSynch);
return (Result);
}
static SYNCHHANDLE CleanUp (SYNCHHANDLE hSynch)
{ /* Prepare to return from a create of a synchronization handle.
If there was any failure, free any allocated resources
"Flags" indicates which Windows objects are required in the
synchronization handle */
BOOL ok = TRUE;
DWORD Flags;
if (hSynch == NULL) return NULL;
Flags = hSynch->dwFlags;
if (Flags & 4 == 1 && hSynch->hEvent == NULL) ok = FALSE;
if (Flags & 2 == 1 && hSynch->hMutex == NULL) ok = FALSE;
if (Flags & 1 == 1 && hSynch->hMutexa == NULL) ok = FALSE;
if (!ok) {
CloseSynchHandle (hSynch);
return NULL;
}
/* Everything worked */
return hSynch;
}
static SYNCHHANDLE AllocSynchHandle (LPCTSTR lpName,
LPTSTR MutexName, LPTSTR EventName, LPTSTR MutexaName,
LPBOOL pfNewObject)
/* Allocate memory for a synchronization handle. Unnamed objects
have their handles created directly in the process heap, whereas
named objects have two handles, one allocated locally to
contain the handles (which are local to this process) and
out of a shared memory pool mapped to the paging file for
the shared object state.
Also, create the names for the three internal objects
and determine whether this is a new object that should be initialized. */
{
HANDLE hSynchDB; /* Mutex to protect the entire synchronization object database */
HANDLE hMap, hFile = (HANDLE)0xFFFFFFFF;
/* Shared memory and file handle for maintaining synch objects */
BOOL FirstTime;
SYNCHHANDLE pView, pNew = NULL, pFirstFree, hLocal;
hLocal = HeapAlloc (GetProcessHeap(), HEAP_ZERO_MEMORY, SYNCH_HANDLE_SIZE);
*pfNewObject = TRUE;
if (hLocal == NULL || lpName == NULL
|| _tcscmp (lpName, _T("")) == 0) /* The object is not named */
return hLocal;
/* The object is named. Create names for the internal objects. */
_stprintf (MutexName, _T("%s%s"), lpName, _T(".mtx"));
_stprintf (EventName, _T("%s%s"), lpName, _T(".evt"));
_stprintf (MutexaName, _T("%s%s"), lpName, _T(".mtxa"));
/* Lock access to the synchronization object data base to prevent other threads
from concurrently creating another object of the same name.
All processes and threads use this same well-known mutex name. */
hSynchDB = CreateMutex (NULL, FALSE, SYNCH_OBJECT_MUTEX);
WaitForSingleObject (hSynchDB, INFINITE);
/* Access the shared memory where the synchronization objects are maintained.
It is necessary, however, first to check if this is the first time
that an object has been created so that the shared memory-mapped
table can be initialized.
The test is achieved with an OpenFileMapping call. */
_try {
hMap = OpenFileMapping (FILE_MAP_WRITE, FALSE, SYNCH_FM_NAME);
FirstTime = (hMap == NULL);
if (FirstTime)
hMap = CreateFileMapping (hFile, NULL, PAGE_READWRITE, 0, SIZE_SYNCH_DB, SYNCH_FM_NAME);
if (hMap == NULL) _leave;
pView = (SYNCHHANDLE)MapViewOfFile (hMap, FILE_MAP_WRITE, 0, 0, SIZE_SYNCH_DB);
if (pView == NULL) _leave;
if (FirstTime) memset (pView, 0, SIZE_SYNCH_DB);
/* Search to see if an object of this name already exists.
The entry the mapped record is used for bookkeeping, in
case it is ever needed in the future. An empty slot
is detected by a 0 reference count. */
pFirstFree = NULL;
for (pNew = pView+1; pNew < pView + SYNCH_MAX_NUMBER; pNew++) {
if ((pFirstFree == NULL) && (pNew->RefCount <= 0)) pFirstFree = pNew;
if ((pNew->lpName != NULL)
&& _tcscmp (pNew->lpName, lpName) == 0) break; /* Name exists */
}
if (pNew < pView + SYNCH_MAX_NUMBER) { /* The name exists */
*pfNewObject = FALSE;
} else if (pFirstFree != NULL) {
/* The name does not exist, but we have found and empty slot. */
*pfNewObject = TRUE;
pNew = pFirstFree;
} else { /* The name does not exist, but there is no free slot. */
pNew = NULL;
*pfNewObject = TRUE;
}
}
_finally {
if (pNew != NULL) hLocal->ViewOfFile = pView;
hLocal->SharedHandle = pNew;
if (hLocal->SharedHandle != NULL)
InterlockedIncrement (&(hLocal->SharedHandle->RefCount));
ReleaseMutex (hSynchDB);
return hLocal;
}
}
This is the header file.
/* synchize.h - header file to go with Synchize.c */
typedef struct _SYNCH_HANDLE_STRUCTURE {
HANDLE hEvent;
HANDLE hMutex;
HANDLE hMutexa;
LONG MaxCount;
volatile LONG CurCount;
LONG RefCount; /* Number of references to a shared handle */
BOOL fFirstComeFirstServed; /* For multiple wait semaphores */
DWORD dwFlags; /* Used as required */
LPVOID ViewOfFile; /* For named objects, this is the mapped file
view containing the handle */
struct _SYNCH_HANDLE_STRUCTURE *SharedHandle; /* For named objects, this is the
shared part of the handle, containing
the handle state */
TCHAR lpName[MAX_PATH];
} SYNCH_HANDLE_STRUCTURE, *SYNCHHANDLE;
#define SYNCH_HANDLE_SIZE sizeof (SYNCH_HANDLE_STRUCTURE)
#define SYNCH_MAX_NUMBER 1000
#define SIZE_SYNCH_DB SYNCH_MAX_NUMBER*SYNCH_HANDLE_SIZE
#define SYNCH_OBJECT_MUTEX _T("SynchObjectMutex")
#define SYNCH_FM_NAME _T("SynchSharedMem")
/* Error codes - all must have bit 29 set */
#define SYNCH_ERROR 0X20000000 /* EXERCISE - REFINE THE ERROR NUMBERS */
SYNCHHANDLE CreateSemaphoreCE (LPSECURITY_ATTRIBUTES, LONG, LONG, LPCTSTR);
BOOL ReleaseSemaphoreCE (SYNCHHANDLE, LONG, LPLONG);
DWORD WaitForSemaphoreCE (SYNCHHANDLE, DWORD);
BOOL CloseSynchHandle (SYNCHHANDLE);
SYNCHHANDLE CreateSemaphoreMW (LPSECURITY_ATTRIBUTES, LONG, LONG,BOOL, LPCTSTR);
BOOL ReleaseSemaphoreMW (SYNCHHANDLE, LONG, LPLONG);
DWORD WaitForSemaphoreMW (SYNCHHANDLE, LONG, DWORD);
This is the test program
/* Test the Multiple Wait Semaphore synchronization compound object */
#include "EvryThng.h"
#include "Synchize.h"
static DWORD WINAPI ProducerTh (LPVOID);
static DWORD WINAPI ConsumerTh (LPVOID);
static DWORD WINAPI MonitorTh (LPVOID);
static BOOL WINAPI CtrlcHandler (DWORD);
static SYNCHHANDLE hSemMW;
static LONG SemMax, SemInitial, NumProducers, NumConsumers, TotalExcess = 0;
static HANDLE hTestMutex;
static volatile BOOL Debug = FALSE;
static volatile BOOL Exit = FALSE;
static volatile DWORD NumSuccess = 0, NumFailures = 0, NumProduced = 0,
FailureCount = 0, NumConsumed = 0;
static LPDWORD ProducerCount, ConsumerCount;
int _tmain (int argc, LPTSTR argv[])
/* Test CE semaphores; that is, semaphores created out of a mutex, a
counter, and an autoreset event */
{
LPHANDLE Producer, Consumer;
LONG iThread;
DWORD ThreadId;
HANDLE hMonitor;
BOOL FirstCFS; /* First come, first served, or first available? */
TCHAR Name [MAX_PATH] = _T("");
FILETIME FileTime; /* Times to seed random #s. */
SYSTEMTIME SysTi;
/* Randomly initialize the random number seed */
GetSystemTime (&SysTi);
SystemTimeToFileTime (&SysTi, &FileTime);
srand (FileTime.dwLowDateTime);
if (!SetConsoleCtrlHandler (CtrlcHandler, TRUE))
ReportError (_T("Failed to set console control handler"), 1, TRUE);
_tprintf (_T("\nEnter SemMax, SemInitial, NumConsumers:, NumProducers:, FCFS?, Debug: "));
_tscanf (_T("%d %d %d %d %d %d"), &SemMax, &SemInitial, &NumConsumers,
&NumProducers, &FirstCFS, &Debug);
_tprintf (_T("\nEnter Name - NULL for none: "));
_tscanf (_T("%s"), Name);
if (_tcscmp (Name, _T("NULL")) == 0) _tcscpy (Name, _T(""));
/* if (Debug) */ _tprintf (_T("You entered: %d %d %d %d %d %s\n"),
SemMax, SemInitial, NumConsumers, NumProducers, FirstCFS, Name);
/* Create a mutex to synchronize various aspects of the test, such as
updating statistics. A CS won't work as we need a WaitForMultipleObjects
involving this mutex in the monitor thread. */
hTestMutex = CreateMutex (NULL, FALSE, NULL);
if (hTestMutex == NULL) ReportError (_T("Could not create test mutex"), 2, TRUE);
NumProduced = SemInitial; /* Initialize the usage statistics for the semaphore. */
hSemMW = CreateSemaphoreMW (NULL, SemInitial, SemMax, FirstCFS, Name);
if (hSemMW == NULL) {
_tprintf (_T("LastError: %x\n"), GetLastError());
ReportError (_T("Failed to create MW semaphore"), 3, TRUE);
}
if (Debug) _tprintf ("MW Semaphore created successfully\n");
/* Create all the semaphore consuming and releasing threads */
/* Create arrays to hold the thread handles, and also
create integer arrays to count number of iterations by each thread. By
observing these counts, we can be sure that no deadlocks have occurred. */
if (NumConsumers > 0) {
Consumer = malloc (NumConsumers*sizeof(HANDLE));
if (Consumer == NULL)
ReportError (_T("Cannot allocate Consumer handles"), 5, FALSE);
ConsumerCount = HeapAlloc (GetProcessHeap(), HEAP_ZERO_MEMORY,
NumConsumers*sizeof(DWORD));
if (ConsumerCount == NULL)
ReportError (_T("Cannot allocate Consumer handles"), 5, FALSE);
}
if (NumProducers > 0) {
Producer = malloc (NumProducers*sizeof(HANDLE));
if (Producer == NULL)
ReportError (_T("Cannot allocate Producer handles"), 4, FALSE);
ProducerCount = HeapAlloc (GetProcessHeap(), HEAP_ZERO_MEMORY,
NumProducers*sizeof(DWORD));
if (ProducerCount == NULL)
ReportError (_T("Cannot allocate Producer handles"), 4, FALSE);
}
hMonitor = (HANDLE)_beginthreadex (NULL, 0, MonitorTh, (LPVOID)5000,
CREATE_SUSPENDED, &ThreadId);
if (hMonitor == NULL)
ReportError (_T("Cannot create monitor thread"), 6, TRUE);
SetThreadPriority (hMonitor, THREAD_PRIORITY_HIGHEST);
for (iThread = 0; iThread < NumConsumers; iThread++) {
Consumer [iThread] = (HANDLE)_beginthreadex (NULL, 0, ConsumerTh,
(LPVOID)iThread, CREATE_SUSPENDED, &ThreadId);
if (Consumer[iThread] == NULL)
ReportError (_T("Cannot create consumer thread"), 3, TRUE);
}
for (iThread = 0; iThread < NumProducers; iThread++) {
Producer [iThread] = (HANDLE)_beginthreadex (NULL, 0, ProducerTh,
(LPVOID)iThread, CREATE_SUSPENDED, &ThreadId);
if (Producer[iThread] == NULL)
ReportError (_T("Cannot create producer thread"), 3, TRUE);
}
WaitForSingleObject (hTestMutex, INFINITE);
_tprintf (_T("All threads created successfully.\n"));
ReleaseMutex (hTestMutex);
for (iThread = 0; iThread < NumConsumers; iThread++)
ResumeThread (Consumer [iThread]);
for (iThread = 0; iThread < NumProducers; iThread++)
ResumeThread (Producer [iThread]);
ResumeThread (hMonitor);
if (NumConsumers > 0)
WaitForMultipleObjects (NumConsumers, Consumer, TRUE, INFINITE);
if (NumProducers > 0)
WaitForMultipleObjects (NumProducers, Producer, TRUE, INFINITE);
WaitForSingleObject (hMonitor, INFINITE);
for (iThread = 0; iThread < NumConsumers; iThread++)
CloseHandle (Consumer [iThread]);
for (iThread = 0; iThread < NumProducers; iThread++)
CloseHandle (Producer [iThread]);
HeapFree (GetProcessHeap(), 0, Producer);
HeapFree (GetProcessHeap(), 0, Consumer);
HeapFree (GetProcessHeap(), 0, ProducerCount);
HeapFree (GetProcessHeap(), 0, ConsumerCount);
CloseHandle (hMonitor);
if (!CloseSynchHandle (hSemMW))
ReportError (_T("Failed closing synch handle"), 0, TRUE);
_tprintf (_T("All threads terminated\n"));
return 0;
}
static DWORD WINAPI ProducerTh (LPVOID ThNumber)
/* Producer thread:
1. Compute for a random period of time.
2. Produce a random number of units, uniformly distributed [1, SemMax]
The production will return with no units produced if it would cause the
semaphore count to exceed the maximum (this is consistent with Windows
semaphore semantics).
3. Produce twice as quickly as consuming, as many productions will fail because
the number of units is too large.
*/
{
DWORD Id = (DWORD)ThNumber, Delay, i, k;
LONG PrevCount = 0, RelCount = 0;
WaitForSingleObject (hTestMutex, INFINITE);
if (Debug) _tprintf (_T("Starting producer number %d.\n"), Id);
ReleaseMutex (hTestMutex);
while (!Exit) {
/* Delay without necessarily giving up processor control.
Notice how the production rate is the same as the consumption
rate so as to keep things balanced. */
Delay = rand() / 2;
for (i = 0; i < Delay; i++) k = rand()*rand() / 2; /* Waste time */
if (rand() % 3 == 0)
Sleep (rand()/(RAND_MAX/500));
/* Give up the processor 1/3 of the time
just to make the thread interaction more interesting */
RelCount = (long)(((float)rand()/RAND_MAX) * SemMax)+1;
if (!ReleaseSemaphoreMW (hSemMW, RelCount, &PrevCount)) {
WaitForSingleObject (hTestMutex, INFINITE);
if (Debug)
_tprintf (_T("Producer #: %d Failed. PrevCount = %d RelCount = %d\n"),
Id, PrevCount, RelCount);
NumFailures++; /* Maintain producer statistics */
FailureCount += RelCount;
ReleaseMutex (hTestMutex);
} else {
WaitForSingleObject (hTestMutex, INFINITE);
if (Debug)
_tprintf (_T("Producer #: %d Succeeded. PrevCount = %d, RelCount = %d\n"),
Id, PrevCount, RelCount);
NumSuccess++;
NumProduced += RelCount;
ReleaseMutex (hTestMutex);
}
ProducerCount[Id]++; /* Number of producer iterations. */
}
_endthreadex (0);
return 0;
}
static DWORD WINAPI ConsumerTh (LPVOID ThNumber)
{
DWORD Id = (DWORD)ThNumber, Delay, i, k;
LONG SeizeCount;
WaitForSingleObject (hTestMutex, INFINITE);
if (Debug) _tprintf (_T("Starting consumer number %d.\n"), Id);
ReleaseMutex (hTestMutex);
while (!Exit) {
/* Delay without necessarily giving up processor control */
Delay = rand();
for (i = 0; i < Delay; i++) k = rand()*rand(); /* Waste time */;
if (rand() % 3 == 0) Sleep (rand()/(RAND_MAX/1000));
/* Give up the processor 1/3 of the time
just to make the thread interaction more interesting */
/* Random request size */
SeizeCount = (long)(((float)rand()/RAND_MAX) * SemMax)+1;
if (WaitForSemaphoreMW (hSemMW, SeizeCount, rand()) != WAIT_OBJECT_0) {
if (Debug) {
WaitForSingleObject (hTestMutex, INFINITE);
if (Debug) _tprintf (_T("Consumer #: %d Timed out waiting for %d units\n"),
Id, SeizeCount);
ReleaseMutex (hTestMutex);
}
} else { /* The semaphore unit was obtained successfully - update statistics */
WaitForSingleObject (hTestMutex, INFINITE);
if (Debug) _tprintf (_T("Consumer #: %d Obtained %d units\n"), Id, SeizeCount);
NumConsumed += SeizeCount;
ReleaseMutex (hTestMutex);
}
ConsumerCount[Id]++; /* Number of consumer iterations. */
}
_endthreadex (0);
return 0;
}
static DWORD WINAPI MonitorTh (LPVOID Delay)
/* Monitor thread - periodically check the statistics for consistency */
{
LONG CurCount = 0, Max = 0, ExcessCount, i;
SYNCHHANDLE hState;
HANDLE hBoth[2] = {hTestMutex, hSemMW->hMutex};
/* This is cheating to reach inside the opaque handle, but we need to get
simultaneous access to both the semaphore state and to the statistics
in order to test consistency. Some consistency failures are still possible,
however, as the statistics are updated independently of changing the semaphore
state, so the semaphore might be changed by another thread before the producer
or consumer can update the global statistics, Thus, there can be no absolute
consistency between the semaphore state and the global statistics maintained
by this test program, and, to make them consistent would require putting a
critical section around the producer/consumer loop bodies, which would destroy
the asynchronous operation required for testing.
But, the general state of the semaphore can still be checked, and we can
also be sure that it has not been corrupted. */
/* Get the handle to the semaphore state */
hState = (hSemMW->SharedHandle == NULL) ? hSemMW : hSemMW->SharedHandle;
while (!Exit) {
WaitForMultipleObjects (2, hBoth, TRUE, INFINITE);
CurCount = hState->CurCount;
Max = hState->MaxCount;
ExcessCount = NumProduced - (NumConsumed + CurCount);
/* Excess of production over consumption
which should average out to 0 */
_tprintf (_T
("Monitor statistics:\nProduced: %d\nConsumed: %d\nCurrent: %d\nMaximum: %d\n"),
NumProduced, NumConsumed, CurCount, Max);
/* For Correctness, we must have: Produced == Consumed + CurCount */
if (ExcessCount != 0 || CurCount < 0 || CurCount > SemMax || SemMax != Max) {
_tprintf (_T("Discrepancy: %d\n"), ExcessCount);
TotalExcess += ExcessCount;
}
else _tprintf (_T("Consistency test passed. Total excess count: %d\n"), TotalExcess);
_tprintf (_T("Successful release calls: %d\nFailed release calls: %d\n"),
NumSuccess, NumFailures);
_tprintf (_T("Units not released: %d\n"), FailureCount);
_tprintf (_T("Number of Consumer iterations, by thread\n"));
for (i = 0; i < NumConsumers; i++) _tprintf (_T("%6d"), ConsumerCount[i]);
_tprintf (_T("\nNumber of Producer iterations, by thread\n"));
for (i = 0; i < NumProducers; i++) _tprintf (_T("%6d"), ProducerCount[i]);
_tprintf (_T("\n**************\n"));
ReleaseMutex (hTestMutex);
ReleaseMutex (hSemMW->hMutex);
Sleep ((DWORD)Delay);
}
_endthreadex(0);
return 0;
}
static BOOL WINAPI CtrlcHandler (DWORD CtrlEvent)
{
DWORD PrevCount;
LONG i;
if (CtrlEvent == CTRL_C_EVENT) {
WaitForSingleObject (hTestMutex, INFINITE);
_tprintf (_T("Control-c received. Shutting down.\n"));
ReleaseMutex (hTestMutex);
Exit = TRUE;
/* Assure that all Consumer threads are not blocked waiting on
a semaphore so that they have a chance to shut down. */
for (i = 0; i < NumConsumers; i++) {
ReleaseSemaphoreMW (hSemMW, 1, &PrevCount);
/* Release the CPU so that the consumer can be scheduled */
Sleep (0);
}
return TRUE;
}
else return FALSE;
}
|