hartcov.gif (20840 bytes)

Windows System Programming

Return to JMH Associates Home Page  

AUTHOR: Johnson M. Hart, jmhart62@gmail.com

© JMH Associates, 2000. This material  is supplied as a supplement to
Windows System Programming. You can purchase a copy at amazon.com.In Association with Amazon.com


Multiple Wait Semaphore

Sept. 8, 2000: Changed to use the broadcase condition variable model. (See Chapter 10 in the Second Edition.)

Page 235 in the book mentioned that you could atomically wait for multiple semaphore units by using a mutex to guard the semaphore. This is correct, but limited. I call this the "First Come, First Served" multiple wait semantics as a large request will block all subsequent requests (regardless of thread priority or request size) until the initial request is satisfied. This might be the semantics you require, but, you might actually require "First Satisfiable" semantics where a small request will be satisfied before a large request.

This small, integrated collection of functions implements the required routines. Here are some features:

  1. The CreateSemaphoreMW function has a flag to specify "First Satisfiable" or "First Come, First Served" semantics.
  2. You can also name the MW semaphore so that it can be shared between processes.
  3. The WaitForSingleObjectMW function has a parameter to specify the number of required units.
  4. The CloseSynchHandle function plays the same role as CloseHandle. It is intended to be general purpose so that new objects can be implemented over time.

How would you use such a capability? The First Edition Chapter 11's server showed one example, where different threads require different amounts of a limited serving capability. Here is another example, which is fairly realistic. Chapter 10 in the Second Edition takes the idea of a semaphore a step further by implementing a message queueing system.

Suppose you have a "Monitor" thread that is not to start (or get past a certain point) until ALL worker threads are running or have reached some specified point. Here is what to do, using a "First Satisfiable" multiple wait semaphore:

  1. Initialize the semaphore to its maximum count (at creation time), where the maximum count is the number of worker threads.
  2. Each worker thread releases one unit when it starts.
  3. The monitor thread waits atomically for the maximum count. Therefore, it will not start until all threads are running, preventing, perhaps, a race condition.

Here is an exercise involving the above idea and an extra event together with a "First Come, First Served" semaphore. Construct a "thread rendezvous" where all worker threads block at a certain point and they are not released until the monitor finds that some number of them have blocked at the rendezvous. Can you construct a rendezvous object that does not require the monitor thread?

Here is another exercise. Construct a "reverse semaphore" object that is signaled if and only if the count is equal to the maximum value. A waiting thread that is released decreases the semaphore by the count specified in the wait and the reverse semaphore becomes unsignaled. You can do this with some minor modifications to the functions here. See http://www.microsoft.com/msj/0797/win320797top.htm for a different solution, although, in that article, the semaphore is signaled only when it is 0, and there is no multiple wait.

The implementation has some other interesting features:

  1. A synchronization handle structure is defined in the header file. It requires an event and two mutexes. You will see why all three are required. Notice that this means that it is possible to implement a semaphore with the other two objects.
  2. The WaitForSingleObjectMW has at least one none-obvious step; take a look at the code. Also, this function is efficient; there is no busy waiting.. Again, take a look at the code to see how this is achieved. This solution is an example of the "condition variable model" described in Chapter 10 (Second Edition).
  3. The choice of pulsing a manual reset event is essential; again, take a look at the comments.
  4. Interprocess synchronization requires that there be synchronization handle structures in shared memory to maintain the object state. However, there must also be a "shadow" structure in the process-private memory containing handles to the mutexes and event. Notice how this works and how shared memory objects are released for reuse after the last open handle is released.
  5. The test program shows a multiple producer/consumer model. When you bring it up, you are prompted for the number of producer threads (in this test process) and the number of consumer threads, as well as the object name. To test the interprocess synchronization, bring up two separate processes running the same test program. Use the same name for each, and have only producer threads in one (zero consumer threads) and only consumer threads in the other.

COMMENTS ON TESTING, DEBUGGING, AND VALIDATING

Threaded, synchronized systems present significant challenges not only in the design and implementation, but also in all stages of quality assurance. There are many traps for the unwary. However, writing good code need not be that difficult, and it can be fun and rewarding. Here are some hints as well as some personal opinions. Some of the personal opinions are not widely shared.

  1. Testing is necessary, but NOT sufficient, to assure correct operation. This can be said of any program, but it is particularly so when synchronization issues are involved. There can be any number of subtle race conditions, deadlocks, and so on that may never appear in testing, and, if they do, they may not be repeatable or easy to diagnose.
  2. The same can be said of debugging. I never used the debugger on these examples despite the fact that Visual C++ has excellent debugging capabilities. Debugging also changes the timing of threads and can mask the defects you are trying to find. Finally, the VC++ debugger actually changes the behavior of functions such as PulseEvent; Microsoft has a discussion of this at http://support.microsoft.com/support/kb/articles/Q173/2/60.ASP.
  3. It is absolutely necessary to design and implement the system correctly.
  4. Furthermore, you must carefully read and analyze your synchronization code and consider all the possibilities. For example, after every wait or release call, ask what would happen if the thread were preempted at that instant and another thread were to execute the same code. Furthermore, ask if all the conditions that you expect to hold at this point can be assured to hold. For example, consider the potential race condition in Chapter 10's sortMT program. My initial implementations of the program here and of sortMT program had defects that never showed up in tests but became apparent when I went through these thought experiments.
  5. Ultimately, you must prove to yourself (and, perhaps, to your colleagues) that the code is correct. The proof may not be formal (although formal proofs can play a role), but it is worthwhile going through the motions.
  6. In my personal experience, even the best programmers engage in extreme forms of arm waving when discussing synchronization, and the buzz words fly furiously. Try not to engage in such activity, and be suspicious of anyone who does.
  7. Haste is the biggest enemy. Take the time to assure yourself that the code is correct.

I have seen some on-line discussion that emphasizes this point. Under the debugger, PulseEvent had a different effect than without the debugger. Apparently, when pulsing a manual reset event, the signal could be lost under certain circumstances when using the debugger.

I plan to add some material regarding informal proofs of correctness for dealing with synchronized threads.

Now that I have said that, I fully expect to be humbled when someone points out an obvious (or not so obvious) defect in my code. Please take the challenge to do so. If you succeed, I will still be grateful and will make the excuse that I was in a hurry (see the last bullet) even though I did perform all the recommended steps. Jan 5, 2000: I just found a bug after 18 months using this code. It is now fixed in the code below.

This is the function library.


/* SynchObj.c

   Copyright (c) 1998, Johnson M. Hart
   Permission is granted for any and all use providing that this copyright is
   properly acknowledged.
   There are no assurances of suitability for any use whatsoever.

   Library of synchronization "objects" to supplement the standard Windows
   events and mutexes.

   For simplicity, this implementation shares a header file (synchize.h) with
   another library of synchronization objects (EvMxSem.c) that is designed
   to operate under Windows CE, which does not have semaphores.

   Implementation notes:
   1. All required internal data structures are allocated on the process's heap
   2. Where appropriate, a new error code is returned (see the header
      file), or, if the error is a Windows error, that code is unchanged.
   3. Notice the new handle type, "SYNCHHANDLE" which has handles, counters,
      and other information. This structure will grow as new objects are added
      to this set; some members are specific to only one or two of the objects;
      in particular, the structure is more general than is required just for
      some of the specific examples.
   4. Mutexes are used for critical sections. These could be replaced with
      CRITICAL_SECTION objects if the objects are used totally within
      a process. Even better, if the object is not named, then you
      know that it will be used only within the process and you could
      make the decision at create time. HOWEVER, you will lose the timeout
      option if you do so, AND you will also lose the naming and interprocess
      capabilities.With Windows NT4.0 and later, SignalObjectAndWait is a possibility.
   5. These simulated semaphores can be named and hence can be shared
      between processes.
   6. No Windows semaphores are used, so this implementation (with null names)
      will operate under Windows CE.
   7. The implementation shows several interesting aspect of synchronization, some
      of which are specific to Windows and some of which are general. There are pointed
      out in the comments as appropriate.
   8. These objects have a WaitForSingleObject equivalent. There is, however, no
      equivalent to WaitForMultipleObjects as this is very difficult, if not impossible
      to do efficiently outside of the kernel.
*/

#include "EvryThng.h"
#include "Synchize.h"

static SYNCHHANDLE CleanUp (SYNCHHANDLE);
static SYNCHHANDLE AllocSynchHandle (LPCTSTR, LPTSTR, LPTSTR, LPTSTR, LPBOOL);
/*********************************************
   MULTIPLE WAIT SEMAPHORE function family.
   These semaphores allow a thread to wait atomically for several "units"
   rather than just one. The DEFAULT wait semantics are considered to be 
   "First Satisfiable". That is, a thread with a satisfiable request
   will be released even if a thread (regardless of priority) has an
   outstanding request that is not currently satisfiable.

   Optional semantics, specified at create time, are
   "First Come, First Served". A thread with request larger than the
   number of units currently available will block all subsequent
   threads, even those with satisfiable requests or those of
   higher priority. The solution on p. 235 is a First Come, First Served
   solution, but here we implement it within the general framework.
*/

SYNCHHANDLE CreateSemaphoreMW (

   LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,    /* pointer to security attributes */
   LONG lInitialCount,     /* initial count */
   LONG lMaximumCount,     /* maximum count */
   BOOL fFirstComeFirstServed,     /* First Satisfiable is the default */
   LPCTSTR lpName )

/* Multiple wait semaphore creation.
   Requires a counter, a mutex to protect the semaphore state, and a
   manual-reset event.

   Here are the rules that must always hold between the manual-reset event
   and the mutex (any violation of these rules by the multiple wait semaphore
   functions will, in all likelihood, result in a defect):
      1. No thread can set, pulse, or reset the event,
         nor can it access any part of the SYNCHHANDLE structure,
         without first gaining ownership of the mutex.
         BUT, a thread can wait on the event without owning the mutex
         (this is clearly necessary or else the event could never be set).
      2. The event is in a signaled state if and only if the count has just been
         changed so that it is greater than zero. To assure this property, the count should
         be checked after every semaphore decrease.
      3. The semaphore count is always >= 0 and <= the maximum count
*/
{
   SYNCHHANDLE hSynch = NULL, hShare = NULL;
   TCHAR MutexName [MAX_PATH] = _T(""), EventName [MAX_PATH] = _T(""),
      MutexaName[MAX_PATH] = _T("");
   BOOL NewObject;

   if (lInitialCount > lMaximumCount || lMaximumCount < 0 || lInitialCount < 0) {
           /* Bad parameters */
      SetLastError (SYNCH_ERROR);
      return NULL;
   }

   hSynch = AllocSynchHandle (lpName, MutexName, EventName, MutexaName, &NewObject);
   if (hSynch == NULL) {
      SetLastError (SYNCH_ERROR);
      return NULL;
   }

   /* Create the object handles. These are always created in the process's
      local handle. */

   hSynch->hMutex = CreateMutex (lpSemaphoreAttributes, FALSE, (LPCTSTR)MutexName);

   /* Create the event. It is initially signaled if and only if the
      initial count is > 0 */
   hSynch->hEvent = CreateEvent (lpSemaphoreAttributes, TRUE   /* manual reset */,
      lInitialCount > 0, (LPCTSTR)EventName);

   hSynch->hMutexa = NULL;
   hSynch->dwFlags = 6;    /* An event and a mutex, but no secondary mutex
                       unless it is a First Come, First Served Multiple Wait semaphore */
   if (fFirstComeFirstServed) {
      hSynch->hMutexa = CreateMutex (lpSemaphoreAttributes, FALSE, (LPCTSTR)MutexaName);
      hSynch->dwFlags = 7;    /* All three objects were created */
   }

   /* Set the object state, always in the local handle (for quick reference)
      and in the shared handle if there is one. */

   hSynch->MaxCount = lMaximumCount;
   hSynch->CurCount = lInitialCount;   /* The local value is not maintained. */
   hSynch->fFirstComeFirstServed = fFirstComeFirstServed;
   _tcscpy (hSynch->lpName, lpName);

   hShare = hSynch->SharedHandle;
   if (NewObject && hShare != NULL ) { 
             /* There is a new shared handle. Set the state if it is new */
      hShare->MaxCount = lMaximumCount;
      hShare->CurCount = lInitialCount;    /* The local value is not maintained. */
      hShare->fFirstComeFirstServed = fFirstComeFirstServed;
      _tcscpy (hShare->lpName, lpName);
   }

   /* Return with the handle, or, if there was any error, return
      a null after closing any open handles and freeing any allocated memory */

      return CleanUp (hSynch);
}

BOOL ReleaseSemaphoreMW (SYNCHHANDLE hSemMW, LONG cReleaseCount, LPLONG lpPreviousCount)
/* Multiple wait equivalent to ReleaseSemaphore */
{
   BOOL Result = TRUE;
   SYNCHHANDLE hState;

   /* Gain access to the object to assure that the release count
      would not cause the total count to exceed the maximum */

   /* The state is maintained locally if the object is unnamed and
      in shared memory for a named object */

   hState = (hSemMW->SharedHandle == NULL) ? hSemMW : hSemMW->SharedHandle;
   _try {
      WaitForSingleObject (hSemMW->hMutex, INFINITE);
      *lpPreviousCount = hState->CurCount;
      if (hState->CurCount + cReleaseCount > hState->MaxCount || cReleaseCount <= 0) {
    SetLastError (SYNCH_ERROR);
    Result = FALSE;
    _leave;
      }
      hState->CurCount += cReleaseCount;

      /* Pulse the manual reset. All threads currently waiting on the
         event will be released, and then the event will be reset. */
      PulseEvent (hSemMW->hEvent);
   }
   _finally {
      ReleaseMutex (hSemMW->hMutex);
      return Result;
   }
}

DWORD WaitForSemaphoreMW (SYNCHHANDLE hSemMW, LONG cSemRequest, DWORD dwMilliseconds)
/* Multiple wait semaphore equivalent of WaitForSingleObject.
   The second parameter is the number of units requested, and the waiting will be
   either first come, first served or first available, depending on the option
   selected at create time. */
{
   DWORD WaitResult, TimeOut = min (dwMilliseconds, CV_TIMEOUT);
   /* cv_TIMEOUT is required for the broadcase condition variables model.
   See Chapter 10 in the Second Edition. The nominal value is 50. */
   SYNCHHANDLE hState;

   /* The state is maintained locally if the object is unnamed and
      in shared memory for a named object */

   hState = (hSemMW->SharedHandle == NULL) ? hSemMW : hSemMW->SharedHandle;

   if (cSemRequest <= 0 || cSemRequest > hState->MaxCount) {
      SetLastError (SYNCH_ERROR);
      return WAIT_FAILED;
   }

   /* If this is a First Come, First Served MW semaphore, then this thread
      seizes the secondary mutex to block all other threads.
      Do this BEFORE waiting on the mutex protecting the semaphore state. */
   if (hSemMW->fFirstComeFirstServed) {
      WaitResult = WaitForSingleObject (hSemMW->hMutexa, dwMilliseconds);
      if (WaitResult != WAIT_OBJECT_0 && WaitResult != WAIT_ABANDONED_0) return WaitResult;
   }

   WaitResult = WaitForSingleObject (hSemMW->hMutex, dwMilliseconds);
   if (WaitResult != WAIT_OBJECT_0 && WaitResult != WAIT_ABANDONED_0) return WaitResult;

   while (hState->CurCount < cSemRequest) { 
      /* The count is less than the number requested.
         The thread must wait on the event (which, by the rules, is currently reset)
         for semaphore resources to become available. First, of course, the mutex
         must be released so that another thread will be capable of setting the event.
      */
      ReleaseMutex (hSemMW->hMutex);
      /* Wait for the event, indicating that some other thread has increased
         the semaphore count.
         The event is autoreset and signaled with a SetEvent (not PulseEvent)
         so only those threads currently waiting will be released.
      */
      WaitResult = WaitForSingleObject (hSemMW->hEvent, dwMilliseconds);
      if (WaitResult != WAIT_OBJECT_0) return WaitResult;  /* Add logic: go back and try the while again! */

      /* Seize the semaphore so that the semaphore state can be retested
         at the top of the loop. Note that there may be other threads
         waiting at this same point, but only one at a time can test the
         semaphore count. */
         WaitResult = WaitForSingleObject (hSemMW->hMutex, dwMilliseconds);
      if (WaitResult != WAIT_OBJECT_0 && WaitResult != WAIT_ABANDONED_0) return WaitResult;
   }

   /* hState->CurCount < cSemRequest (the request can be satisfied), and
      this thread owns the semaphore */
   hState->CurCount -= cSemRequest;
   if (hState->CurCount) SetEvent (hSemMW->hEvent);
   ReleaseMutex (hSemMW->hMutex);
   if (hSemMW->fFirstComeFirstServed) ReleaseMutex (hSemMW->hMutexa);

   return WaitResult;

}

BOOL CloseSynchHandle (SYNCHHANDLE hSynch)
/* Close a synchronization handle. 
   Improvement: Test for a valid handle before closing the handle */
{
   BOOL Result = TRUE;

   if (hSynch->hEvent  != NULL) Result = Result && CloseHandle (hSynch->hEvent);
   if (hSynch->hMutex  != NULL) Result = Result && CloseHandle (hSynch->hMutex);
   if (hSynch->hMutexa != NULL) Result = Result && CloseHandle (hSynch->hMutexa);
   if (hSynch->SharedHandle != NULL) 
      InterlockedDecrement (&(hSynch->SharedHandle->RefCount));
   if (hSynch->ViewOfFile != NULL) UnmapViewOfFile (hSynch->ViewOfFile);
   HeapFree (GetProcessHeap (), 0, hSynch);
   return (Result);
}

static SYNCHHANDLE CleanUp (SYNCHHANDLE hSynch)
{ /* Prepare to return from a create of a synchronization handle.
      If there was any failure, free any allocated resources
      "Flags" indicates which Windows objects are required in the 
      synchronization handle */
   BOOL ok = TRUE;
   DWORD Flags;

   if (hSynch == NULL) return NULL;
   Flags = hSynch->dwFlags;
   if (Flags & 4 == 1 && hSynch->hEvent ==  NULL) ok = FALSE;
   if (Flags & 2 == 1 && hSynch->hMutex ==  NULL) ok = FALSE;
   if (Flags & 1 == 1 && hSynch->hMutexa == NULL) ok = FALSE;
   if (!ok) {
      CloseSynchHandle (hSynch);
      return NULL;
   }

   /* Everything worked */
   return hSynch;
}

static SYNCHHANDLE AllocSynchHandle (LPCTSTR lpName, 
   LPTSTR MutexName, LPTSTR EventName, LPTSTR MutexaName,
   LPBOOL pfNewObject)
/* Allocate memory for a synchronization handle. Unnamed objects
   have their handles created directly in the process heap, whereas
   named objects have two handles, one allocated locally to
   contain the handles (which are local to this process) and
   out of a shared memory pool mapped to the paging file for
   the shared object state.
   Also, create the names for the three internal objects
   and determine whether this is a new object that should be initialized. */
{
   HANDLE hSynchDB;   /*  Mutex to protect the entire synchronization object database */
   HANDLE hMap, hFile = (HANDLE)0xFFFFFFFF;
      /* Shared memory and file handle for maintaining synch objects */
   BOOL FirstTime;
   SYNCHHANDLE pView, pNew = NULL, pFirstFree, hLocal;

   hLocal = HeapAlloc (GetProcessHeap(), HEAP_ZERO_MEMORY, SYNCH_HANDLE_SIZE);
   *pfNewObject = TRUE;
   if (hLocal == NULL || lpName == NULL
    || _tcscmp (lpName, _T("")) == 0) /*  The object is not named */
      return hLocal;

   /* The object is named. Create names for the internal objects. */
   _stprintf (MutexName,  _T("%s%s"), lpName, _T(".mtx"));
   _stprintf (EventName,  _T("%s%s"), lpName, _T(".evt"));
   _stprintf (MutexaName, _T("%s%s"), lpName, _T(".mtxa"));

   /* Lock access to the synchronization object data base to prevent other threads
      from concurrently creating another object of the same name.
      All processes and threads use this same well-known mutex name. */

   hSynchDB = CreateMutex (NULL, FALSE, SYNCH_OBJECT_MUTEX);
   WaitForSingleObject (hSynchDB, INFINITE);

   /*  Access the shared memory where the synchronization objects are maintained.
      It is necessary, however, first to check if this is the first time
      that an object has been created so that the shared memory-mapped 
      table can be initialized.
      The test is achieved with an OpenFileMapping call. */

   _try {
      hMap = OpenFileMapping (FILE_MAP_WRITE, FALSE, SYNCH_FM_NAME);
      FirstTime = (hMap == NULL);
      if (FirstTime)
    hMap = CreateFileMapping (hFile, NULL, PAGE_READWRITE, 0, SIZE_SYNCH_DB, SYNCH_FM_NAME);
      if (hMap == NULL) _leave;
      pView = (SYNCHHANDLE)MapViewOfFile (hMap, FILE_MAP_WRITE, 0, 0, SIZE_SYNCH_DB);
      if (pView == NULL) _leave;
      if (FirstTime) memset (pView, 0, SIZE_SYNCH_DB);

      /* Search to see if an object of this name already exists.
         The entry the mapped record is used for bookkeeping, in
         case it is ever needed in the future. An empty slot
         is detected by a 0 reference count. */
      pFirstFree = NULL;
      for (pNew = pView+1; pNew < pView + SYNCH_MAX_NUMBER; pNew++) {
        if ((pFirstFree == NULL) && (pNew->RefCount <= 0)) pFirstFree = pNew;
        if ((pNew->lpName != NULL)
           && _tcscmp (pNew->lpName, lpName) == 0) break; /* Name exists */
      }

      if (pNew < pView + SYNCH_MAX_NUMBER) { /* The name exists */
         *pfNewObject = FALSE;
      } else if (pFirstFree != NULL) {
             /* The name does not exist, but we have found and empty slot. */
           *pfNewObject = TRUE;
           pNew = pFirstFree;
      } else { /*  The name does not exist, but there is no free slot. */
           pNew = NULL;
           *pfNewObject = TRUE;
      }
   }

   _finally {
      if (pNew != NULL) hLocal->ViewOfFile = pView;
      hLocal->SharedHandle = pNew;
      if (hLocal->SharedHandle != NULL)
    InterlockedIncrement (&(hLocal->SharedHandle->RefCount));
      ReleaseMutex (hSynchDB);
      return hLocal;
   }
}

This is the header file.


/* synchize.h - header file to go with Synchize.c */

typedef struct _SYNCH_HANDLE_STRUCTURE {
   HANDLE hEvent;
   HANDLE hMutex;
   HANDLE hMutexa;
   LONG MaxCount;
   volatile LONG CurCount;
   LONG RefCount;         /* Number of references to a shared handle */
   BOOL fFirstComeFirstServed;   /* For multiple wait semaphores */
   DWORD dwFlags;         /* Used as required */
   LPVOID ViewOfFile;     /* For named objects, this is the mapped file
                             view containing the handle */
   struct _SYNCH_HANDLE_STRUCTURE *SharedHandle; /* For named objects, this is the 
                             shared part of the handle, containing
                             the handle state */
   TCHAR lpName[MAX_PATH];
} SYNCH_HANDLE_STRUCTURE, *SYNCHHANDLE;

#define SYNCH_HANDLE_SIZE sizeof (SYNCH_HANDLE_STRUCTURE)
#define SYNCH_MAX_NUMBER 1000
#define SIZE_SYNCH_DB SYNCH_MAX_NUMBER*SYNCH_HANDLE_SIZE

#define SYNCH_OBJECT_MUTEX _T("SynchObjectMutex")
#define SYNCH_FM_NAME _T("SynchSharedMem")


/* Error codes - all must have bit 29 set */
#define SYNCH_ERROR 0X20000000     /* EXERCISE - REFINE THE ERROR NUMBERS */

SYNCHHANDLE CreateSemaphoreCE (LPSECURITY_ATTRIBUTES, LONG, LONG, LPCTSTR);

BOOL ReleaseSemaphoreCE (SYNCHHANDLE, LONG, LPLONG);

DWORD WaitForSemaphoreCE (SYNCHHANDLE, DWORD);

BOOL CloseSynchHandle (SYNCHHANDLE);

SYNCHHANDLE CreateSemaphoreMW (LPSECURITY_ATTRIBUTES, LONG, LONG,BOOL, LPCTSTR);

BOOL ReleaseSemaphoreMW (SYNCHHANDLE, LONG, LPLONG);

DWORD WaitForSemaphoreMW (SYNCHHANDLE, LONG, DWORD);

This is the test program


/* Test the Multiple Wait Semaphore synchronization compound object */

#include "EvryThng.h"
#include "Synchize.h"

static DWORD WINAPI ProducerTh (LPVOID);
static DWORD WINAPI ConsumerTh (LPVOID);
static DWORD WINAPI MonitorTh (LPVOID);
static BOOL WINAPI CtrlcHandler (DWORD);

static SYNCHHANDLE hSemMW;
static LONG SemMax, SemInitial, NumProducers, NumConsumers, TotalExcess = 0;
static HANDLE hTestMutex;
static volatile BOOL Debug = FALSE;

static volatile BOOL Exit = FALSE;

static volatile DWORD NumSuccess = 0, NumFailures = 0, NumProduced = 0,
       FailureCount = 0, NumConsumed = 0;
static LPDWORD ProducerCount, ConsumerCount;

int _tmain (int argc, LPTSTR argv[])
   /* Test CE semaphores; that is, semaphores created out of a mutex, a
      counter, and an autoreset event */

{
   LPHANDLE Producer, Consumer;
   LONG iThread;
   DWORD ThreadId;
   HANDLE hMonitor;
   BOOL FirstCFS;     /* First come, first served, or first available? */
   TCHAR Name [MAX_PATH] = _T("");

   FILETIME FileTime; /* Times to seed random #s. */
   SYSTEMTIME SysTi;

   /* Randomly initialize the random number seed */

   GetSystemTime (&SysTi);
   SystemTimeToFileTime (&SysTi, &FileTime);
   srand (FileTime.dwLowDateTime);

   if (!SetConsoleCtrlHandler (CtrlcHandler, TRUE))
      ReportError (_T("Failed to set console control handler"), 1, TRUE);

   _tprintf (_T("\nEnter SemMax, SemInitial, NumConsumers:, NumProducers:, FCFS?, Debug: "));
   _tscanf (_T("%d %d %d %d %d %d"), &SemMax, &SemInitial, &NumConsumers,
      &NumProducers, &FirstCFS, &Debug);
   _tprintf (_T("\nEnter Name - NULL for none: "));
   _tscanf (_T("%s"), Name);
   if (_tcscmp (Name, _T("NULL")) == 0) _tcscpy (Name, _T(""));

   /* if (Debug) */ _tprintf (_T("You entered: %d %d %d %d %d %s\n"),
      SemMax, SemInitial, NumConsumers, NumProducers, FirstCFS, Name);

   /* Create a mutex to synchronize various aspects of the test, such as
      updating statistics. A CS won't work as we need a WaitForMultipleObjects
      involving this mutex in the monitor thread. */
   hTestMutex = CreateMutex (NULL, FALSE, NULL);
   if (hTestMutex == NULL) ReportError (_T("Could not create test mutex"), 2, TRUE);

   NumProduced = SemInitial; /* Initialize the usage statistics for the semaphore. */

   hSemMW = CreateSemaphoreMW (NULL, SemInitial, SemMax, FirstCFS, Name);
   if (hSemMW == NULL) {
      _tprintf (_T("LastError: %x\n"), GetLastError()); 
      ReportError (_T("Failed to create MW semaphore"), 3, TRUE);
   }
   if (Debug) _tprintf ("MW Semaphore created successfully\n");

   /* Create all the semaphore consuming and releasing threads */
   /* Create arrays to hold the thread handles, and also
      create integer arrays to count number of iterations by each thread. By
      observing these counts, we can be sure that no deadlocks have occurred. */

   if (NumConsumers > 0) {
      Consumer = malloc (NumConsumers*sizeof(HANDLE));
      if (Consumer == NULL)
         ReportError (_T("Cannot allocate Consumer handles"), 5, FALSE);
         ConsumerCount = HeapAlloc (GetProcessHeap(), HEAP_ZERO_MEMORY,
                NumConsumers*sizeof(DWORD));
      if (ConsumerCount == NULL)
         ReportError (_T("Cannot allocate Consumer handles"), 5, FALSE);
   }

   if (NumProducers > 0) {
      Producer = malloc (NumProducers*sizeof(HANDLE));
      if (Producer == NULL)
         ReportError (_T("Cannot allocate Producer handles"), 4, FALSE);
         ProducerCount = HeapAlloc (GetProcessHeap(), HEAP_ZERO_MEMORY,
               NumProducers*sizeof(DWORD));
      if (ProducerCount == NULL)
         ReportError (_T("Cannot allocate Producer handles"), 4, FALSE);
   }

   hMonitor = (HANDLE)_beginthreadex (NULL, 0, MonitorTh, (LPVOID)5000,
            CREATE_SUSPENDED, &ThreadId);
   if (hMonitor == NULL)
      ReportError (_T("Cannot create monitor thread"), 6, TRUE);
   SetThreadPriority (hMonitor, THREAD_PRIORITY_HIGHEST);

   for (iThread = 0; iThread < NumConsumers; iThread++) {
      Consumer [iThread] = (HANDLE)_beginthreadex (NULL, 0, ConsumerTh,
            (LPVOID)iThread, CREATE_SUSPENDED, &ThreadId);
      if (Consumer[iThread] == NULL)
         ReportError (_T("Cannot create consumer thread"), 3, TRUE);
   }
   for (iThread = 0; iThread < NumProducers; iThread++) {
      Producer [iThread] = (HANDLE)_beginthreadex (NULL, 0, ProducerTh,
            (LPVOID)iThread, CREATE_SUSPENDED, &ThreadId);
      if (Producer[iThread] == NULL)
         ReportError (_T("Cannot create producer thread"), 3, TRUE);
   }

   WaitForSingleObject (hTestMutex, INFINITE);
   _tprintf (_T("All threads created successfully.\n"));
   ReleaseMutex (hTestMutex);
   for (iThread = 0; iThread < NumConsumers; iThread++)
      ResumeThread (Consumer [iThread]);
   for (iThread = 0; iThread < NumProducers; iThread++)
      ResumeThread (Producer [iThread]);

   ResumeThread (hMonitor);

   if (NumConsumers > 0)
      WaitForMultipleObjects (NumConsumers, Consumer, TRUE, INFINITE);
   if (NumProducers > 0)
      WaitForMultipleObjects (NumProducers, Producer, TRUE, INFINITE);
   WaitForSingleObject (hMonitor, INFINITE);

   for (iThread = 0; iThread < NumConsumers; iThread++)
      CloseHandle (Consumer [iThread]);
   for (iThread = 0; iThread < NumProducers; iThread++)
      CloseHandle (Producer [iThread]);

   HeapFree (GetProcessHeap(), 0, Producer);
   HeapFree (GetProcessHeap(), 0, Consumer);
   HeapFree (GetProcessHeap(), 0, ProducerCount); 
   HeapFree (GetProcessHeap(), 0, ConsumerCount);
   
   CloseHandle (hMonitor);
   if (!CloseSynchHandle (hSemMW))
      ReportError (_T("Failed closing synch handle"), 0, TRUE);

   _tprintf (_T("All threads terminated\n"));
   return 0;
}

static DWORD WINAPI ProducerTh (LPVOID ThNumber)
/* Producer thread:
   1. Compute for a random period of time.
   2. Produce a random number of units, uniformly distributed [1, SemMax]
      The production will return with no units produced if it would cause the
      semaphore count to exceed the maximum (this is consistent with Windows 
      semaphore semantics).
   3. Produce twice as quickly as consuming, as many productions will fail because
      the number of units is too large.
*/
{
   DWORD Id = (DWORD)ThNumber, Delay, i, k;
   LONG PrevCount = 0, RelCount = 0;

   WaitForSingleObject (hTestMutex, INFINITE);
   if (Debug) _tprintf (_T("Starting producer number %d.\n"), Id);
   ReleaseMutex (hTestMutex);

   while (!Exit) {
      /* Delay without necessarily giving up processor control.
         Notice how the production rate is the same as the consumption
         rate so as to keep things balanced. */
      Delay = rand() / 2; 
      for (i = 0; i < Delay; i++) k = rand()*rand() / 2; /* Waste time */
      if (rand() % 3 == 0)
         Sleep (rand()/(RAND_MAX/500));
                      /* Give up the processor 1/3 of the time
                         just to make the thread interaction more interesting */

      RelCount = (long)(((float)rand()/RAND_MAX) * SemMax)+1;

      if (!ReleaseSemaphoreMW (hSemMW, RelCount, &PrevCount)) {
         WaitForSingleObject (hTestMutex, INFINITE);
         if (Debug)
            _tprintf (_T("Producer #: %d Failed. PrevCount = %d RelCount = %d\n"),
            Id, PrevCount, RelCount);
         NumFailures++;      /* Maintain producer statistics */
         FailureCount += RelCount;
         ReleaseMutex (hTestMutex);
      } else {
         WaitForSingleObject (hTestMutex, INFINITE);
        if (Debug)
           _tprintf (_T("Producer #: %d Succeeded. PrevCount = %d, RelCount = %d\n"),
           Id, PrevCount, RelCount);
        NumSuccess++;
        NumProduced += RelCount;
        ReleaseMutex (hTestMutex);
      }
      ProducerCount[Id]++;    /* Number of producer iterations. */
   }

   _endthreadex (0);
   return 0;
}

static DWORD WINAPI ConsumerTh (LPVOID ThNumber)
{
   DWORD Id = (DWORD)ThNumber, Delay, i, k;
   LONG SeizeCount;

   WaitForSingleObject (hTestMutex, INFINITE);
   if (Debug) _tprintf (_T("Starting consumer number %d.\n"), Id);
   ReleaseMutex (hTestMutex);

   while (!Exit) {
      /* Delay without necessarily giving up processor control */
      Delay = rand();
      for (i = 0; i < Delay; i++) k = rand()*rand(); /* Waste time */;
      if (rand() % 3 == 0) Sleep (rand()/(RAND_MAX/1000));
       /* Give up the processor 1/3 of the time
          just to make the thread interaction more interesting */

      /* Random request size */
      SeizeCount = (long)(((float)rand()/RAND_MAX) * SemMax)+1;

      if (WaitForSemaphoreMW (hSemMW, SeizeCount, rand()) != WAIT_OBJECT_0) {
        if (Debug) {
           WaitForSingleObject (hTestMutex, INFINITE);
           if (Debug) _tprintf (_T("Consumer #: %d Timed out waiting for %d units\n"), 
               Id, SeizeCount);
           ReleaseMutex (hTestMutex);
        }
      } else { /*  The semaphore unit was obtained successfully - update statistics */
           WaitForSingleObject (hTestMutex, INFINITE);
           if (Debug) _tprintf (_T("Consumer #: %d Obtained %d units\n"), Id, SeizeCount);
           NumConsumed += SeizeCount;
           ReleaseMutex (hTestMutex);
      }
      ConsumerCount[Id]++;    /*  Number of consumer iterations. */
   }

   _endthreadex (0);
   return 0;
}

static DWORD WINAPI MonitorTh (LPVOID Delay)
/* Monitor thread - periodically check the statistics for consistency */
{
   LONG CurCount = 0, Max = 0, ExcessCount, i;
   SYNCHHANDLE hState;

   HANDLE hBoth[2] = {hTestMutex, hSemMW->hMutex}; 
 /* This is cheating to reach inside the opaque handle, but we need to get
    simultaneous access to both the semaphore state and to the statistics
    in order to test consistency. Some consistency failures are still possible,
    however, as the statistics are updated independently of changing the semaphore
    state, so the semaphore might be changed by another thread before the producer
    or consumer can update the global statistics, Thus, there can be no absolute
    consistency between the semaphore state and the global statistics maintained
    by this test program, and, to make them consistent would require putting a 
    critical section around the producer/consumer loop bodies, which would destroy
    the asynchronous operation required for testing.
    But, the general state of the semaphore can still be checked, and we can
    also be sure that it has not been corrupted. */

   /* Get the handle to the semaphore state */
   hState = (hSemMW->SharedHandle == NULL) ? hSemMW : hSemMW->SharedHandle;

   while (!Exit) {
      WaitForMultipleObjects (2, hBoth, TRUE, INFINITE);
      CurCount = hState->CurCount;
      Max = hState->MaxCount;

      ExcessCount = NumProduced - (NumConsumed + CurCount);
                   /* Excess of production over consumption
                      which should average out to 0 */
      _tprintf (_T
         ("Monitor statistics:\nProduced: %d\nConsumed: %d\nCurrent: %d\nMaximum: %d\n"),
         NumProduced,  NumConsumed, CurCount, Max);
      /* For Correctness, we must have: Produced == Consumed + CurCount */
      if (ExcessCount != 0 || CurCount < 0 || CurCount > SemMax || SemMax != Max) {
        _tprintf (_T("Discrepancy: %d\n"), ExcessCount);
        TotalExcess += ExcessCount;
      }
      else _tprintf (_T("Consistency test passed. Total excess count: %d\n"), TotalExcess);

      _tprintf (_T("Successful release calls: %d\nFailed release calls: %d\n"),
            NumSuccess, NumFailures);
      _tprintf (_T("Units not released:     %d\n"), FailureCount);
      _tprintf (_T("Number of Consumer iterations, by thread\n"));
      for (i = 0; i < NumConsumers; i++) _tprintf (_T("%6d"), ConsumerCount[i]);
      _tprintf (_T("\nNumber of Producer iterations, by thread\n"));
      for (i = 0; i < NumProducers; i++) _tprintf (_T("%6d"), ProducerCount[i]);

      _tprintf (_T("\n**************\n"));

      ReleaseMutex (hTestMutex);
      ReleaseMutex (hSemMW->hMutex);

      Sleep ((DWORD)Delay);
   }

   _endthreadex(0);
   return 0;
}

static BOOL WINAPI CtrlcHandler (DWORD CtrlEvent)
{
   DWORD PrevCount;
   LONG i;

   if (CtrlEvent == CTRL_C_EVENT) {
      WaitForSingleObject (hTestMutex, INFINITE);
      _tprintf (_T("Control-c received. Shutting down.\n"));
      ReleaseMutex (hTestMutex);
      Exit = TRUE;

      /* Assure that all Consumer threads are not blocked waiting on
         a semaphore so that they have a chance to shut down. */
      for (i = 0; i < NumConsumers; i++) {
         ReleaseSemaphoreMW (hSemMW, 1, &PrevCount);
              /* Release the CPU so that the consumer can be scheduled */
         Sleep (0);
      }

      return TRUE;
   }
   else return FALSE;
}