about summary refs log tree commit diff
path: root/converter/ppm/ppmtompeg/parallel.c
diff options
context:
space:
mode:
authorgiraffedata <giraffedata@9d0c8265-081b-0410-96cb-a4ca84ce46f8>2006-08-19 03:12:28 +0000
committergiraffedata <giraffedata@9d0c8265-081b-0410-96cb-a4ca84ce46f8>2006-08-19 03:12:28 +0000
commit1fd361a1ea06e44286c213ca1f814f49306fdc43 (patch)
tree64c8c96cf54d8718847339a403e5e67b922e8c3f /converter/ppm/ppmtompeg/parallel.c
downloadnetpbm-mirror-1fd361a1ea06e44286c213ca1f814f49306fdc43.tar.gz
netpbm-mirror-1fd361a1ea06e44286c213ca1f814f49306fdc43.tar.xz
netpbm-mirror-1fd361a1ea06e44286c213ca1f814f49306fdc43.zip
Create Subversion repository
git-svn-id: http://svn.code.sf.net/p/netpbm/code/trunk@1 9d0c8265-081b-0410-96cb-a4ca84ce46f8
Diffstat (limited to 'converter/ppm/ppmtompeg/parallel.c')
-rw-r--r--converter/ppm/ppmtompeg/parallel.c2278
1 files changed, 2278 insertions, 0 deletions
diff --git a/converter/ppm/ppmtompeg/parallel.c b/converter/ppm/ppmtompeg/parallel.c
new file mode 100644
index 00000000..021e6d2b
--- /dev/null
+++ b/converter/ppm/ppmtompeg/parallel.c
@@ -0,0 +1,2278 @@
+/*===========================================================================*
+ * parallel.c              
+ *                         
+ *  Procedures to make encoder run in parallel   
+ *                             
+ *===========================================================================*/
+
+/* COPYRIGHT INFORMATION IS AT THE END OF THIS FILE */
+
+
+/*==============*
+ * HEADER FILES *
+ *==============*/
+
+#define _XOPEN_SOURCE 500 /* Make sure stdio.h contains pclose() */
+/* _ALL_SOURCE is needed on AIX to make the C library include the 
+   socket services (e.g. define struct sockaddr) 
+
+   Note that AIX standards.h actually sets feature declaration macros such
+   as _XOPEN_SOURCE, unless they are already set.
+*/
+#define _ALL_SOURCE
+
+/* On AIX, pm_config.h includes standards.h, which expects to be included
+   after feature declaration macros such as _XOPEN_SOURCE.  So we include
+   pm_config.h as late as possible.
+*/
+
+
+#include <stdarg.h>
+#include <time.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <signal.h>
+#include <netdb.h>
+#include <assert.h>
+#include <sys/types.h>
+#include <sys/times.h>
+
+#include "mallocvar.h"
+#include "nstring.h"
+
+#include "pm.h"
+
+#include "all.h"
+#include "param.h"
+#include "mpeg.h"
+#include "prototypes.h"
+#include "readframe.h"
+#include "fsize.h"
+#include "combine.h"
+#include "frames.h"
+#include "input.h"
+#include "psocket.h"
+#include "frametype.h"
+#include "gethostname.h"
+
+#include "parallel.h"
+
+
+struct childState {
+    boolean      finished;
+    unsigned int startFrame;
+    unsigned int numFrames;
+    unsigned int lastNumFrames;
+    unsigned int numSeconds;
+    float        fps;
+};
+
+
+struct scheduler {
+    /* This tracks the state of the subsystem that determines the assignments
+       for the children
+    */
+    unsigned int nextFrame;
+        /* The next frame that needs to be assigned to a child */
+    unsigned int numFramesInJob;
+        /* Total number of frames in the whole run of Ppmtompeg */
+    unsigned int numMachines;
+};
+
+
+
+#define MAX_IO_SERVERS  10
+#ifndef SOMAXCONN
+#define SOMAXCONN 5
+#endif
+
+/*==================*
+ * CONSTANTS        *
+ *==================*/
+
+#define TERMINATE_PID_SIGNAL    SIGTERM  /* signal used to terminate forked childs */
+#ifndef MAXARGS
+#define MAXARGS     1024   /* Max Number of arguments in safe_fork command */
+#endif
+
+/*==================*
+ * STATIC VARIABLES *
+ *==================*/
+
+static char rsh[256];
+static struct hostent *hostEntry = NULL;
+static boolean  *frameDone;
+static int  outputServerSocket;
+static int  decodeServerSocket;
+static boolean  parallelPerfect = FALSE;
+static  int current_max_forked_pid=0;
+
+
+/*==================*
+ * GLOBAL VARIABLES *
+ *==================*/
+
+extern int yuvHeight, yuvWidth;
+extern char statFileName[256];
+extern FILE *statFile;
+extern boolean debugMachines;
+extern boolean debugSockets;
+int parallelTestFrames = 10;
+int parallelTimeChunks = 60;
+const char *IOhostName;
+int ioPortNumber;
+int decodePortNumber;
+boolean niceProcesses = FALSE;
+boolean forceIalign = FALSE;
+int     machineNumber = -1;
+boolean remoteIO = FALSE;
+bool separateConversion;
+    /* The I/O server will convert from the input format to the base format,
+       and the slave will convert from the base format to the YUV internal
+       format.  If false, the I/O server assumes the input format is the
+       base format and converts from the base format to the YUV internal
+       format; the slave does no conversion.
+    */
+time_t  IOtime = 0;
+extern char encoder_name[];
+int     ClientPid[MAX_MACHINES+4];
+
+
+/*=====================*
+ * INTERNAL PROCEDURES *
+ *=====================*/
+
+
+static void PM_GNU_PRINTF_ATTR(1,2)
+machineDebug(const char format[], ...) {
+
+    va_list args;
+
+    va_start(args, format);
+
+    if (debugMachines) {
+        const char * const hostname = GetHostName();
+        fprintf(stderr, "%s: ---", hostname);
+        strfree(hostname);
+        vfprintf(stderr, format, args);
+        fputc('\n', stderr);
+    }
+    va_end(args);
+}
+
+
+
+static void PM_GNU_PRINTF_ATTR(1,2)
+errorExit(const char format[], ...) {
+
+    const char * const hostname = GetHostName();
+
+    va_list args;
+
+    va_start(args, format);
+
+    fprintf(stderr, "%s: FATAL ERROR.  ", hostname);
+    strfree(hostname);
+    vfprintf(stderr, format, args);
+    fputc('\n', stderr);
+
+    exit(1);
+
+    va_end(args);
+}
+
+
+
+static void
+TransmitPortNum(const char * const hostName, 
+                int          const portNum, 
+                int          const newPortNum) {
+/*----------------------------------------------------------------------------
+   Transmit the port number 'newPortNum' to the master on port 'portNum'
+   of host 'hostName'.
+-----------------------------------------------------------------------------*/
+    int clientSocket;
+    const char * error;
+    
+    ConnectToSocket(hostName, portNum, &hostEntry, &clientSocket, &error);
+    
+    if (error)
+        errorExit("Can't connect in order to transmit port number.  %s",
+                  error);
+
+    WriteInt(clientSocket, newPortNum);
+    
+    close(clientSocket);
+}
+
+
+
+static void
+readYUVDecoded(int          const socketFd,
+               unsigned int const Fsize_x,
+               unsigned int const Fsize_y,
+               MpegFrame *  const frameP) {
+    
+    unsigned int y;
+    
+    for (y = 0; y < Fsize_y; ++y)         /* Y */
+        ReadBytes(socketFd, 
+                  (unsigned char *)frameP->decoded_y[y], Fsize_x);
+    
+    for (y = 0; y < (Fsize_y >> 1); ++y)  /* U */
+        ReadBytes(socketFd, 
+                  (unsigned char *)frameP->decoded_cb[y], (Fsize_x >> 1));
+    
+    for (y = 0; y < (Fsize_y >> 1); ++y)  /* V */
+        ReadBytes(socketFd, 
+                  (unsigned char *)frameP->decoded_cr[y], (Fsize_x >> 1));
+}
+
+
+
+static void
+writeYUVDecoded(int          const socketFd,
+                unsigned int const Fsize_x,
+                unsigned int const Fsize_y,
+                MpegFrame *  const frameP) {
+    
+    unsigned int y;
+    
+    for (y = 0; y < Fsize_y; ++y)         /* Y */
+        WriteBytes(socketFd, 
+                  (unsigned char *)frameP->decoded_y[y], Fsize_x);
+    
+    for (y = 0; y < (Fsize_y >> 1); ++y)  /* U */
+        WriteBytes(socketFd, 
+                   (unsigned char *)frameP->decoded_cb[y], (Fsize_x >> 1));
+    
+    for (y = 0; y < (Fsize_y >> 1); ++y)  /* V */
+        WriteBytes(socketFd, 
+                   (unsigned char *)frameP->decoded_cr[y], (Fsize_x >> 1));
+}
+
+
+
+static void
+writeYUVOrig(int          const socketFd,
+             unsigned int const Fsize_x,
+             unsigned int const Fsize_y,
+             MpegFrame *  const frameP) {
+    
+    unsigned int y;
+    
+    for (y = 0; y < Fsize_y; ++y)         /* Y */
+        WriteBytes(socketFd, 
+                  (unsigned char *)frameP->orig_y[y], Fsize_x);
+    
+    for (y = 0; y < (Fsize_y >> 1); ++y)  /* U */
+        WriteBytes(socketFd, 
+                   (unsigned char *)frameP->orig_cb[y], (Fsize_x >> 1));
+    
+    for (y = 0; y < (Fsize_y >> 1); ++y)  /* V */
+        WriteBytes(socketFd, 
+                   (unsigned char *)frameP->orig_cr[y], (Fsize_x >> 1));
+}
+
+
+
+static void
+readYUVOrig(int          const socketFd,
+            unsigned int const Fsize_x,
+            unsigned int const Fsize_y,
+            MpegFrame *  const frameP) {
+    
+    unsigned int y;
+    
+    for (y = 0; y < Fsize_y; ++y)         /* Y */
+        ReadBytes(socketFd, 
+                  (unsigned char *)frameP->orig_y[y], Fsize_x);
+    
+    for (y = 0; y < (Fsize_y >> 1); ++y)  /* U */
+        ReadBytes(socketFd, 
+                  (unsigned char *)frameP->orig_cb[y], (Fsize_x >> 1));
+    
+    for (y = 0; y < (Fsize_y >> 1); ++y)  /* V */
+        ReadBytes(socketFd, 
+                  (unsigned char *)frameP->orig_cr[y], (Fsize_x >> 1));
+}
+
+
+
+/*===========================================================================*
+ *
+ * EndIOServer
+ *
+ *  called by the master process -- tells the I/O server to commit
+ *  suicide
+ *
+ * RETURNS: nothing
+ *
+ * SIDE EFFECTS:    none
+ *
+ *===========================================================================*/
+static void
+  EndIOServer()
+{
+  /* send signal to IO server:  -1 as frame number */
+  GetRemoteFrame(NULL, -1);
+}
+
+
+/*===========================================================================*
+ *
+ * NotifyDecodeServerReady
+ *
+ *  called by a slave to the Decode Server to tell it a decoded frame
+ *  is ready and waiting
+ *
+ * RETURNS: nothing
+ *
+ * SIDE EFFECTS:    none
+ *
+ *===========================================================================*/
+void
+NotifyDecodeServerReady(int const id) {
+
+    int   clientSocket;
+    time_t  tempTimeStart, tempTimeEnd;
+    const char * error;
+    
+    time(&tempTimeStart);
+    
+    ConnectToSocket(IOhostName, decodePortNumber, &hostEntry, &clientSocket,
+                    &error);
+    
+    if (error)
+        errorExit("CHILD: Can't connect to decode server to tell it a frame "
+                "is ready.  %s", error);
+    
+    WriteInt(clientSocket, id);
+    
+    close(clientSocket);
+    
+    time(&tempTimeEnd);
+    IOtime += (tempTimeEnd-tempTimeStart);
+}
+
+
+
+/*===========================================================================*
+ *
+ * WaitForDecodedFrame
+ *
+ *  blah blah blah
+ *
+ * RETURNS: nothing
+ *
+ * SIDE EFFECTS:    none
+ *
+ *===========================================================================*/
+void
+  WaitForDecodedFrame(id)
+int id;
+{
+  int const negativeTwo = -2;
+  int   clientSocket;
+  int     ready;
+  const char * error;
+
+  /* wait for a decoded frame */
+  if ( debugSockets ) {
+    fprintf(stdout, "WAITING FOR DECODED FRAME %d\n", id);
+  }
+
+  ConnectToSocket(IOhostName, decodePortNumber, &hostEntry, &clientSocket,
+                  &error);
+
+  if (error)
+      errorExit("CHILD: Can't connect to decode server "
+                "to get decoded frame.  %s",
+                error);
+
+  /* first, tell DecodeServer we're waiting for this frame */
+  WriteInt(clientSocket, negativeTwo);
+
+  WriteInt(clientSocket, id);
+
+  ReadInt(clientSocket, &ready);
+
+  if ( ! ready ) {
+    int     waitSocket;
+    int     waitPort;
+    int     otherSock;
+    const char * error;
+
+    /* it's not ready; set up a connection and wait for decode server */
+    CreateListeningSocket(&waitSocket, &waitPort, &error);
+    if (error)
+        errorExit("Unable to create socket on which to listen for "
+                  "decoded frame.  %s", error);
+
+    /* tell decode server where we are */
+    WriteInt(clientSocket, machineNumber);
+
+    WriteInt(clientSocket, waitPort);
+
+    close(clientSocket);
+
+    if ( debugSockets ) {
+      fprintf(stdout, "SLAVE:  WAITING ON SOCKET %d\n", waitPort);
+      fflush(stdout);
+    }
+
+    AcceptConnection(waitSocket, &otherSock, &error);
+    if (error)
+        errorExit("I/O SERVER: Failed to accept next connection.  %s", error);
+
+    /* should we verify this is decode server? */
+    /* for now, we won't */
+
+    close(otherSock);
+
+    close(waitSocket);
+  } else {
+    close(clientSocket);
+  }
+
+  if ( debugSockets ) {
+    fprintf(stdout, "YE-HA FRAME %d IS NOW READY\n", id);
+  }
+}
+
+
+/*===========================================================================*
+ *
+ * SendDecodedFrame
+ *
+ *  Send the frame to the decode server.
+ *
+ * RETURNS: nothing
+ *
+ * SIDE EFFECTS:    none
+ *
+ *===========================================================================*/
+void
+SendDecodedFrame(MpegFrame * const frameP) {
+/*----------------------------------------------------------------------------
+   Send frame *frameP to the decode server.
+-----------------------------------------------------------------------------*/
+    int const negativeTwo = -2;
+    
+    int clientSocket;
+    const char * error;
+    
+    /* send to IOServer */
+    ConnectToSocket(IOhostName, ioPortNumber, &hostEntry,
+                    &clientSocket, &error);
+    if (error)
+        errorExit("CHILD: Can't connect to decode server to "
+                  "give it a decoded frame.  %s", error);
+    
+    WriteInt(clientSocket, negativeTwo);
+    
+    WriteInt(clientSocket, frameP->id);
+
+    writeYUVDecoded(clientSocket, Fsize_x, Fsize_y, frameP);
+    
+    close(clientSocket);
+}
+
+
+/*===========================================================================*
+ *
+ * GetRemoteDecodedFrame
+ *
+ *  get the decoded frame from the decode server.
+ *
+ * RETURNS: nothing
+ *
+ * SIDE EFFECTS:   
+ *
+ *===========================================================================*/
+void
+GetRemoteDecodedRefFrame(MpegFrame * const frameP, 
+                         int         const frameNumber) {
+/*----------------------------------------------------------------------------
+   Get decoded frame number 'frameNumber' *frameP from the decode server.
+-----------------------------------------------------------------------------*/
+  int const negativeThree = -3;
+  int clientSocket;
+  const char * error;
+
+  /* send to IOServer */
+  ConnectToSocket(IOhostName, ioPortNumber, &hostEntry,
+                  &clientSocket, &error);
+  if (error)
+      errorExit("CHILD: Can't connect to decode server "
+                "to get a decoded frame.  %s",
+                error);
+
+  /* ask IOServer for decoded frame */
+  WriteInt(clientSocket, negativeThree);
+
+  WriteInt(clientSocket, frameP->id);
+
+  readYUVDecoded(clientSocket, Fsize_x, Fsize_y, frameP);
+
+  close(clientSocket);
+}
+
+
+/*********
+  routines handling forks, execs, PIDs and signals
+  save, system-style forks
+  apian@ise.fhg.de
+  *******/
+
+
+/*===========================================================================*
+ *
+ * cleanup_fork
+ *
+ *  Kill all the children, to be used when we get killed
+ *
+ * RETURNS: nothing
+ *
+ * SIDE EFFECTS:   kills other processes
+ *
+ *===========================================================================*/
+static void cleanup_fork( dummy )       /* try to kill all child processes */
+     int dummy;
+{
+  register int i;
+  for (i = 0;  i < current_max_forked_pid;  ++i ) {
+
+#ifdef DEBUG_FORK
+    fprintf(stderr, "cleanup_fork: killing PID %d\n", ClientPid[i]);
+#endif
+
+    if (kill(ClientPid[i], TERMINATE_PID_SIGNAL)) {
+      fprintf(stderr, "cleanup_fork: killed PID=%d failed (errno %d)\n", 
+          ClientPid[i], errno);
+    }
+  }
+}
+
+/*===========================================================================*
+ *
+ * safe_fork
+ *
+ *  fork a command
+ *
+ * RETURNS:     success/failure
+ *
+ * SIDE EFFECTS:   Fork the command, and save to PID so you can kil it later!
+ *
+ *===========================================================================*/
+static int safe_fork(command)       /* fork child process and remember its PID */
+     char *command;
+{
+  static int init=0;
+  char *argis[MAXARGS];
+  register int i=1;
+  
+  if (!(argis[0] = strtok(command, " \t"))) return(0); /* tokenize */
+  while ((argis[i] = strtok(NULL, " \t")) && i < MAXARGS) ++i;
+  argis[i] = NULL;
+  
+#ifdef DEBUG_FORK
+  {register int i=0; 
+   fprintf(stderr, "Command %s becomes:\n", command);
+   while(argis[i]) {fprintf(stderr, "--%s--\n", argis[i]); ++i;} }
+#endif
+  
+  if (!init) {          /* register clean-up routine */
+    signal (SIGQUIT, cleanup_fork);
+    signal (SIGTERM, cleanup_fork);
+    signal (SIGINT , cleanup_fork);
+    init=1;
+  }
+  
+  if (-1 == (ClientPid[current_max_forked_pid] = fork()) )  {
+    perror("safe_fork: fork failed ");
+    return(-1);
+  }
+  if( !ClientPid[current_max_forked_pid]) { /* we are in child process */
+    execvp(argis[0], argis );
+    perror("safe_fork child: exec failed ");
+    exit(1);
+  }
+#ifdef DEBUG_FORK
+  fprintf(stderr, "parallel: forked PID=%d\n", ClientPid[current_max_forked_pid]);
+#endif
+  current_max_forked_pid++;
+  return(0);
+}
+
+
+/*=====================*
+ * EXPORTED PROCEDURES *
+ *=====================*/
+
+            /*=================*
+             * IO SERVER STUFF *
+             *=================*/
+
+
+/*===========================================================================*
+ *
+ * SetIOConvert
+ *
+ *  sets the IO conversion to be separate or not.  If separate, then
+ *  some post-processing is done at slave end
+ *
+ * RETURNS: nothing
+ *
+ * SIDE EFFECTS:    none
+ *
+ *===========================================================================*/
+void
+SetIOConvert(bool const separate) {
+    separateConversion = separate;
+}
+
+
+/*===========================================================================*
+ *
+ * SetParallelPerfect
+ *
+ *  If this is called, then frames will be divided up completely, and
+ *  evenly (modulo rounding) between all the processors
+ *
+ * RETURNS: nothing
+ *
+ * SIDE EFFECTS:    Sets parallelPerfect ....
+ *
+ *===========================================================================*/
+void
+SetParallelPerfect(val)
+boolean val;
+{
+    parallelPerfect = val;
+}
+
+
+/*===========================================================================*
+ *
+ * SetRemoteShell
+ *
+ *  sets the remote shell program (usually rsh, but different on some
+ *  machines)
+ *
+ * RETURNS: nothing
+ *
+ * SIDE EFFECTS:    none
+ *
+ *===========================================================================*/
+void
+SetRemoteShell(const char * const shell) {
+    strcpy(rsh, shell);
+}
+
+
+static void
+decodedFrameToDisk(int const otherSock) {
+/*----------------------------------------------------------------------------
+  Get a decoded from from socket 'otherSock' and write it to disk.
+-----------------------------------------------------------------------------*/
+    int frameNumber;
+    MpegFrame * frameP;
+
+    ReadInt(otherSock, &frameNumber);
+    
+    if (debugSockets) {
+        fprintf(stdout, "INPUT SERVER:  GETTING DECODED FRAME %d\n", 
+                frameNumber);
+        fflush(stdout);
+    }
+
+    /* should read frame from socket, then write to disk */
+    frameP = Frame_New(frameNumber, 'i');
+    
+    Frame_AllocDecoded(frameP, TRUE);
+    
+    readYUVDecoded(otherSock, Fsize_x, Fsize_y, frameP);
+
+    /* now output to disk */
+    WriteDecodedFrame(frameP);
+
+    Frame_Free(frameP);
+}        
+
+
+
+static void
+decodedFrameFromDisk(int const otherSock) {
+
+    /* request for decoded frame from disk */
+            
+    int frameNumber;
+    MpegFrame * frameP;
+
+    ReadInt(otherSock, &frameNumber);
+    
+    if (debugSockets) {
+        fprintf(stdout, "INPUT SERVER:  READING DECODED FRAME %d "
+                "from DISK\n", frameNumber);
+        fflush(stdout);
+    }
+
+    /* should read frame from disk, then write to socket */
+    frameP = Frame_New(frameNumber, 'i');
+    
+    Frame_AllocDecoded(frameP, TRUE);
+    
+    ReadDecodedRefFrame(frameP, frameNumber);
+    
+    writeYUVDecoded(otherSock, Fsize_x, Fsize_y, frameP);
+    
+    Frame_Free(frameP);
+}        
+
+
+
+static void
+routeFromSocketToDisk(int              const otherSock,
+                      unsigned char ** const bigBufferP,
+                      unsigned int *   const bigBufferSizeP) {
+
+    /* routing output frame from socket to disk */
+
+    int frameNumber;
+    int numBytes;
+    unsigned char * bigBuffer;
+    unsigned int bigBufferSize;
+    const char * fileName;
+    FILE * filePtr;
+
+    bigBuffer     = *bigBufferP;
+    bigBufferSize = *bigBufferSizeP;
+
+    ReadInt(otherSock, &frameNumber);
+    ReadInt(otherSock, &numBytes);
+
+    /* Expand bigBuffer if necessary to fit this frame */
+    if (numBytes > bigBufferSize) {
+        bigBufferSize = numBytes;
+        if (bigBuffer != NULL)
+            free(bigBuffer);
+
+        MALLOCARRAY_NOFAIL(bigBuffer, bigBufferSize);
+    }
+    
+    /* now read in the bytes */
+    ReadBytes(otherSock, bigBuffer, numBytes);
+    
+    /* open file to output this stuff to */
+    asprintfN(&fileName, "%s.frame.%d", outputFileName, frameNumber);
+    filePtr = fopen(fileName, "wb");
+
+    if (filePtr == NULL)
+        errorExit("I/O SERVER: Could not open output file(3):  %s", fileName);
+
+    strfree(fileName);
+
+    /* now write the bytes here */
+    fwrite(bigBuffer, sizeof(char), numBytes, filePtr);
+    
+    fclose(filePtr);
+    
+    if (debugSockets) {
+        fprintf(stdout, "====I/O SERVER:  WROTE FRAME %d to disk\n",
+                frameNumber);
+        fflush(stdout);
+    }
+
+    *bigBufferP     = bigBuffer;
+    *bigBufferSizeP = bigBufferSize;
+}        
+
+
+
+static void
+readConvertWriteToSocket(struct inputSource * const inputSourceP,
+                         int                  const otherSock,
+                         int                  const frameNumber,
+                         bool *               const endOfStreamP) {
+/*----------------------------------------------------------------------------
+   Get the frame numbered 'frameNumber' from input source
+   *inputSourceP, apply format conversion User requested, and write
+   the "base format" result to socket 'otherSock'.
+-----------------------------------------------------------------------------*/
+    FILE * convertedFileP;
+    
+    convertedFileP = ReadIOConvert(inputSourceP, frameNumber);
+    if (convertedFileP) {
+        bool eof;
+        eof = FALSE;  /* initial value */
+        while (!eof) {
+            unsigned char buffer[1024];
+            unsigned int numBytes;
+            
+            numBytes = fread(buffer, 1, sizeof(buffer), convertedFileP);
+            
+            if (numBytes > 0) {
+                WriteInt(otherSock, numBytes);
+                WriteBytes(otherSock, buffer, numBytes);
+            } else
+                eof = TRUE;
+        }
+        
+        if (strcmp(ioConversion, "*") == 0 )
+            fclose(convertedFileP);
+        else
+            pclose(convertedFileP);
+        
+        *endOfStreamP = FALSE;
+    } else
+        *endOfStreamP = TRUE;
+}
+
+
+
+static void
+readWriteYuvToSocket(struct inputSource * const inputSourceP,
+                     int                  const otherSock,
+                     int                  const frameNumber,
+                     bool *               const endOfStreamP) {
+/*----------------------------------------------------------------------------
+   Read Frame number 'frameNumber' from the input source *inputSourceP,
+   assuming it is in base format, and write its contents in YUV format
+   to socket 'otherSock'.
+
+   Wait for acknowledgement that consumer has received it.
+-----------------------------------------------------------------------------*/
+    MpegFrame * frameP;
+
+    frameP = Frame_New(frameNumber, 'i');
+    
+    ReadFrame(frameP, inputSourceP, frameNumber, inputConversion,
+              endOfStreamP);
+    
+    if (!*endOfStreamP) {
+        writeYUVOrig(otherSock, Fsize_x, Fsize_y, frameP);
+        
+        {
+            /* Make sure we don't leave until other processor read
+               everything
+            */
+            int dummy;
+            ReadInt(otherSock, &dummy);
+            assert(dummy == 0);
+        }
+    }
+    Frame_Free(frameP);
+}
+
+
+
+static void
+readFrameWriteToSocket(struct inputSource * const inputSourceP,
+                       int                  const otherSock,
+                       int                  const frameNumber,
+                       bool *               const endOfStreamP) {
+/*----------------------------------------------------------------------------
+   Read Frame number 'frameNumber' from the input source *inputSourceP
+   and write it to socket 'otherSock'.
+-----------------------------------------------------------------------------*/
+    if (debugSockets) {
+        fprintf(stdout, "I/O SERVER GETTING FRAME %d\n", frameNumber);
+        fflush(stdout);
+    }
+
+    if (separateConversion)
+        readConvertWriteToSocket(inputSourceP, otherSock, frameNumber,
+                                 endOfStreamP);
+    else
+        readWriteYuvToSocket(inputSourceP, otherSock, frameNumber,
+                             endOfStreamP);
+
+    if (debugSockets) {
+        fprintf(stdout, "====I/O SERVER:  READ FRAME %d\n", frameNumber);
+    }
+}
+
+
+
+static void
+processNextConnection(int                  const serverSocket,
+                      struct inputSource * const inputSourceP,
+                      bool *               const doneP,
+                      unsigned char **     const bigBufferP,
+                      unsigned int *       const bigBufferSizeP) {
+
+    int          otherSock;
+    int          command;
+    const char * error;
+    
+    AcceptConnection(serverSocket, &otherSock, &error);
+    if (error)
+        errorExit("I/O SERVER: Failed to accept next connection.  %s", error);
+
+    ReadInt(otherSock, &command);
+
+    switch (command) {
+    case -1:
+        *doneP = TRUE;
+        break;
+    case -2:
+        decodedFrameToDisk(otherSock);
+        break;
+    case -3:
+        decodedFrameFromDisk(otherSock);
+        break;
+    case -4:
+        routeFromSocketToDisk(otherSock, bigBufferP, bigBufferSizeP);
+        break;
+    default: {
+        unsigned int const frameNumber = command;
+
+        bool endOfStream;
+
+        readFrameWriteToSocket(inputSourceP, otherSock, frameNumber,
+                               &endOfStream);
+
+        if (endOfStream) {
+            /* We don't do anything.  Closing the socket with having written
+               anything is our signal that there is no more input.
+
+               (Actually, Ppmtompeg cannot handle stream input in parallel
+               mode -- this code is just infrastructure so maybe it can some
+               day).
+            */
+        }
+    }
+    }
+    close(otherSock);
+}
+ 
+
+
+void
+IoServer(struct inputSource * const inputSourceP,
+         const char *         const parallelHostName, 
+         int                  const portNum) {
+/*----------------------------------------------------------------------------
+   Execute an I/O server.
+
+   An I/O server is the partner on the master machine of a child process
+   on a "remote" system.  "Remote" here doesn't just mean on another system.
+   It means on a system that isn't even in the same cluster -- specifically,
+   a system that doesn't have access to the same filesystem as the master.
+
+   The child process passes frame contents between it and the master via
+   the I/O server.
+-----------------------------------------------------------------------------*/
+    int       ioPortNum;
+    int       serverSocket;
+    boolean   done;
+    unsigned char   *bigBuffer;
+        /* A work buffer that we keep around permanently.  We increase
+           its size as needed, but never shrink it.
+        */
+    unsigned int bigBufferSize;
+        /* The current allocated size of bigBuffer[] */
+    const char * error;
+
+    bigBufferSize = 0;  /* Start with no buffer */
+    bigBuffer = NULL;
+    
+    /* once we get IO port num, should transmit it to parallel server */
+
+    CreateListeningSocket(&serverSocket, &ioPortNum, &error);
+    if (error)
+        errorExit("Unable to create socket on which to listen for "
+                  "reports from children.  %s", error);
+
+    if (debugSockets)
+        fprintf(stdout, "====I/O USING PORT %d\n", ioPortNum);
+
+    TransmitPortNum(parallelHostName, portNum, ioPortNum);
+
+    if (separateConversion)
+        SetFileType(ioConversion);  /* for reading */
+    else
+        SetFileType(inputConversion);
+
+    done = FALSE;  /* initial value */
+
+    while (!done)
+        processNextConnection(serverSocket, inputSourceP,
+                              &done, &bigBuffer, &bigBufferSize);
+
+    close(serverSocket);
+
+    if ( debugSockets ) {
+        fprintf(stdout, "====I/O SERVER:  Shutting Down\n");
+    }
+}
+
+
+
+/*===========================================================================*
+ *
+ * SendRemoteFrame
+ *
+ *  called by a slave to the I/O server; sends an encoded frame
+ *  to the server to be sent to disk
+ *
+ * RETURNS: nothing
+ *
+ * SIDE EFFECTS:    none
+ *
+ *===========================================================================*/
+void
+SendRemoteFrame(int const frameNumber, BitBucket * const bb) {
+
+    int const negativeFour = -4;
+    int clientSocket;
+    time_t  tempTimeStart, tempTimeEnd;
+    const char * error;
+
+    time(&tempTimeStart);
+
+    ConnectToSocket(IOhostName, ioPortNumber, &hostEntry,
+                    &clientSocket, &error);
+    if (error)
+        errorExit("CHILD: Can't connect to I/O server to deliver results.  %s",
+                  error);
+
+    WriteInt(clientSocket, negativeFour);
+
+    WriteInt(clientSocket, frameNumber);
+    
+    if (frameNumber != -1) {
+        /* send number of bytes */
+        
+        WriteInt(clientSocket, (bb->totalbits+7)>>3);
+    
+        /* now send the bytes themselves */
+        Bitio_WriteToSocket(bb, clientSocket);
+    }
+
+    close(clientSocket);
+
+    time(&tempTimeEnd);
+    IOtime += (tempTimeEnd-tempTimeStart);
+}
+
+
+
+void
+GetRemoteFrame(MpegFrame * const frameP, 
+               int         const frameNumber) {
+/*----------------------------------------------------------------------------
+   Get a frame from the I/O server.
+   
+   This is intended for use by a child.
+-----------------------------------------------------------------------------*/
+    int           clientSocket;
+    const char * error;
+
+    Fsize_Note(frameNumber, yuvWidth, yuvHeight);
+
+    if (debugSockets) {
+        fprintf(stdout, "MACHINE %s REQUESTING connection for FRAME %d\n",
+                getenv("HOST"), frameNumber);
+        fflush(stdout);
+    }
+
+    ConnectToSocket(IOhostName, ioPortNumber, &hostEntry,
+                    &clientSocket, &error);
+
+    if (error)
+        errorExit("CHILD: Can't connect to I/O server to get a frame.  %s", 
+                  error);
+
+    WriteInt(clientSocket, frameNumber);
+
+    if (frameNumber != -1) {
+        if (separateConversion) {
+            unsigned char buffer[1024];
+            /* This is by design the exact size of the data per message (except
+               the last message for a frame) the I/O server sends.
+            */
+            int numBytes;  /* Number of data bytes in message */
+            FILE * filePtr = pm_tmpfile();
+
+            /* read in stuff, write to file, perform local conversion */
+            do {
+                ReadInt(clientSocket, &numBytes);
+
+                if (numBytes > sizeof(buffer))
+                    errorExit("Invalid message received: numBytes = %d, "
+                              "which is greater than %d\n", 
+                              numBytes, sizeof(numBytes));
+                ReadBytes(clientSocket, buffer, numBytes);
+
+                fwrite(buffer, 1, numBytes, filePtr);
+            } while ( numBytes == sizeof(buffer) );
+            fflush(filePtr);
+            {
+                bool endOfStream;
+                rewind(filePtr);
+                /* I/O Server gave us base format.  Read it as an MpegFrame */
+                ReadFrameFile(frameP, filePtr, slaveConversion, &endOfStream);
+                assert(!endOfStream);
+            }
+            fclose(filePtr);
+        } else {
+            Frame_AllocYCC(frameP);
+
+            if (debugSockets) {
+                fprintf(stdout, "MACHINE %s allocated YCC FRAME %d\n",
+                        getenv("HOST"), frameNumber);
+                fflush(stdout);
+            }
+            /* I/O Server gave us internal YUV format.  Read it as MpegFrame */
+            readYUVOrig(clientSocket, yuvWidth, yuvHeight, frameP);
+        }
+    }
+
+    WriteInt(clientSocket, 0);
+
+    close(clientSocket);
+
+    if (debugSockets) {
+        fprintf(stdout, "MACHINE %s READ COMPLETELY FRAME %d\n",
+                getenv("HOST"), frameNumber);
+        fflush(stdout);
+    }
+}
+
+
+struct combineControl {
+    unsigned int numFrames;
+};
+
+
+
+
+static void
+getAndProcessACombineConnection(int const outputServerSocket) {
+    int          otherSock;
+    int          command;
+    const char * error;
+
+    AcceptConnection(outputServerSocket, &otherSock, &error);
+
+    if (error)
+        errorExit("COMBINE SERVER: "
+                  "Failed to accept next connection.  %s", error);
+    
+    ReadInt(otherSock, &command);
+    
+    if (command == -2) {
+        /* this is notification from non-remote process that a
+           frame is done.
+            */
+        int frameStart, frameEnd;
+
+        ReadInt(otherSock, &frameStart);
+        ReadInt(otherSock, &frameEnd);
+            
+        machineDebug("COMBINE_SERVER: Frames %d - %d done",
+                     frameStart, frameEnd);
+        {
+            unsigned int i;
+            for (i = frameStart; i <= frameEnd; ++i)
+                frameDone[i] = TRUE;
+        }
+    } else
+        errorExit("COMBINE SERVER: Unrecognized command %d received.",
+                  command);
+
+    close(otherSock);
+}
+
+
+
+#define READ_ATTEMPTS 5 /* number of times (seconds) to retry an input file */
+
+
+static void
+openInputFile(const char * const fileName,
+              FILE **      const inputFilePP) {
+
+    FILE * inputFileP;
+    unsigned int attempts;
+    
+    inputFileP = NULL;
+    attempts = 0;
+
+    while (!inputFileP && attempts < READ_ATTEMPTS) {
+        inputFileP = fopen(fileName, "rb");
+        if (inputFileP == NULL) {
+            pm_message("ERROR  Couldn't read frame file '%s' errno = %d (%s)"
+                       "attempt %d", 
+                       fileName, errno, strerror(errno), attempts);
+            sleep(1);
+        }
+        ++attempts;
+    }
+    if (inputFileP == NULL)
+        pm_error("Unable to open file '%s' after %d attempts.", 
+                 fileName, attempts);
+
+    *inputFilePP = inputFileP;
+}
+
+
+
+static void
+waitForOutputFile(void *        const inputHandle,
+                  unsigned int  const frameNumber,
+                  FILE **       const ifPP) {
+/*----------------------------------------------------------------------------
+   Keep handling output events until we get the specified frame number.
+   Open the file it's in and return the stream handle.
+-----------------------------------------------------------------------------*/
+    struct combineControl * const combineControlP = (struct combineControl *)
+        inputHandle;
+
+    if (frameNumber >= combineControlP->numFrames)
+        *ifPP = NULL;
+    else {
+        const char * fileName;
+
+        while (!frameDone[frameNumber]) {
+            machineDebug("COMBINE_SERVER: Waiting for frame %u done", 
+                         frameNumber);
+
+            getAndProcessACombineConnection(outputServerSocket);
+        }
+        machineDebug("COMBINE SERVER: Wait for frame %u over", frameNumber);
+
+        asprintfN(&fileName, "%s.frame.%u", outputFileName, frameNumber);
+
+        openInputFile(fileName, ifPP);
+
+        strfree(fileName);
+    }
+}
+
+
+
+static void
+unlinkFile(void *       const inputHandle,
+           unsigned int const frameNumber) {
+
+    if (!keepTempFiles) {
+        const char * fileName;
+
+        asprintfN(&fileName, "%s.frame.%u", outputFileName, frameNumber);
+
+        unlink(fileName);
+
+        strfree(fileName);
+    }
+}
+
+
+
+void
+CombineServer(int          const numFrames, 
+              const char * const masterHostName, 
+              int          const masterPortNum,
+              const char * const outputFileName) {
+/*----------------------------------------------------------------------------
+   Execute a combine server.
+
+   This handles combination of frames.
+-----------------------------------------------------------------------------*/
+  int    combinePortNum;
+  FILE * ofP;
+  const char * error;
+  struct combineControl combineControl;
+  
+  /* once we get Combine port num, should transmit it to parallel server */
+  
+  CreateListeningSocket(&outputServerSocket, &combinePortNum, &error);
+  if (error)
+      errorExit("Unable to create socket on which to listen.  %s", error);
+
+  machineDebug("COMBINE SERVER: LISTENING ON PORT %d", combinePortNum);
+  
+  TransmitPortNum(masterHostName, masterPortNum, combinePortNum);
+  
+  MALLOCARRAY_NOFAIL(frameDone, numFrames);
+  {
+      unsigned int i;
+      for (i = 0; i < numFrames; ++i)
+          frameDone[i] = FALSE;
+  }
+  ofP = pm_openw(outputFileName);
+  
+  combineControl.numFrames = numFrames;
+
+  FramesToMPEG(ofP, &combineControl, &waitForOutputFile, &unlinkFile);
+
+  machineDebug("COMBINE SERVER: Shutting down");
+  
+  /* tell Master server we are done */
+  TransmitPortNum(masterHostName, masterPortNum, combinePortNum);
+  
+  close(outputServerSocket);
+
+  fclose(ofP);
+}
+
+
+/*=====================*
+ * MASTER SERVER STUFF *
+ *=====================*/
+
+
+static void
+startCombineServer(const char * const encoderName,
+                   unsigned int const numMachines,
+                   const char * const masterHostName,
+                   int          const masterPortNum,
+                   unsigned int const numInputFiles,
+                   const char * const paramFileName,
+                   int          const masterSocket,
+                   int *        const combinePortNumP) {
+
+    char         command[1024];
+    int          otherSock;
+    const char * error;
+
+    snprintf(command, sizeof(command), 
+             "%s %s -max_machines %d -output_server %s %d %d %s",
+             encoderName, 
+             debugMachines ? "-debug_machines" : "",
+             numMachines, masterHostName, masterPortNum, 
+             numInputFiles, paramFileName);
+
+    machineDebug("MASTER: Starting combine server with shell command '%s'",
+                 command);
+
+    safe_fork(command);
+    
+    machineDebug("MASTER: Listening for connection back from "
+                 "new Combine server");
+
+    AcceptConnection(masterSocket, &otherSock, &error);
+    if (error)
+        errorExit("MASTER SERVER: "
+                  "Failed to accept next connection.  %s", error);
+
+    ReadInt(otherSock, combinePortNumP);
+    close(otherSock);
+
+    machineDebug("MASTER:  Combine port number = %d", *combinePortNumP);
+}
+
+
+
+static void
+startDecodeServer(const char * const encoderName,
+                  unsigned int const numMachines,
+                  const char * const masterHostName,
+                  int          const masterPortNum,
+                  unsigned int const numInputFiles,
+                  const char * const paramFileName,
+                  int          const masterSocket,
+                  int *        const decodePortNumP) {
+
+    char         command[1024];
+    int          otherSock;
+    const char * error;
+
+    snprintf(command, sizeof(command), 
+             "%s %s -max_machines %d -decode_server %s %d %d %s",
+             encoder_name, 
+             debugMachines ? "-debug_machines" : "",
+             numMachines, masterHostName, masterPortNum,
+             numInputFiles, paramFileName);
+
+    machineDebug("MASTER: Starting decode server with shell command '%s'",
+                 command);
+
+    safe_fork(command);
+
+    machineDebug("MASTER: Listening for connection back from "
+                 "new Decode server");
+
+    AcceptConnection(masterSocket, &otherSock, &error);
+    if (error)
+        errorExit("MASTER SERVER: "
+                  "Failed to accept connection back from the new "
+                  "decode server.  %s", error);
+
+    ReadInt(otherSock, decodePortNumP);
+    
+    close(otherSock);
+    
+    machineDebug("MASTER:  Decode port number = %d", *decodePortNumP);
+}
+
+
+
+static void
+startIoServer(const char *   const encoderName,
+              unsigned int   const numChildren,
+              const char *   const masterHostName,
+              int            const masterPortNum,
+              int            const masterSocket,
+              const char *   const paramFileName,
+              int *          const ioPortNumP) {
+              
+    char         command[1024];
+    int          otherSock;
+    const char * error;
+    
+    sprintf(command, "%s -max_machines %d -io_server %s %d %s",
+            encoderName, numChildren, masterHostName, masterPortNum,
+            paramFileName);
+
+    machineDebug("MASTER: Starting I/O server with remote shell command '%s'",
+                 command);
+
+    safe_fork(command);
+    
+    machineDebug("MASTER: Listening for connection back from "
+                 "new I/O server");
+
+    AcceptConnection(masterSocket, &otherSock, &error);
+    if (error)
+        errorExit("MASTER SERVER: "
+                  "Failed to accept connection back from the new "
+                  "I/O server.  %s", error);
+    
+    ReadInt(otherSock, ioPortNumP);
+    close(otherSock);
+    
+    machineDebug("MASTER:  I/O port number = %d", *ioPortNumP);
+}    
+    
+    
+
+static void
+extendToEndOfPattern(unsigned int * const nFramesP,
+                     unsigned int   const startFrame,
+                     unsigned int   const framePatternLen,
+                     unsigned int   const numFramesInStream) {
+
+    assert(framePatternLen >= 1);
+        
+    while (startFrame + *nFramesP < numFramesInStream &&
+           (startFrame + *nFramesP) % framePatternLen != 0)
+        ++(*nFramesP);
+}
+
+
+
+static void
+allocateInitialFrames(struct scheduler * const schedulerP,
+                      boolean            const parallelPerfect,
+                      boolean            const forceIalign,
+                      unsigned int       const framePatternLen,
+                      unsigned int       const parallelTestFrames,
+                      unsigned int       const childNum,
+                      unsigned int *     const startFrameP,
+                      unsigned int *     const nFramesP) {
+/*----------------------------------------------------------------------------
+   Choose which frames, to hand out to the new child numbered 'childNum'.
+-----------------------------------------------------------------------------*/
+    unsigned int const framesPerChild = 
+        MAX(1, ((schedulerP->numFramesInJob - schedulerP->nextFrame) /
+                (schedulerP->numMachines - childNum)));
+
+    unsigned int nFrames;
+
+    if (parallelPerfect)
+        nFrames = framesPerChild;
+    else {
+        assert(parallelTestFrames >= 1);
+
+        nFrames = MIN(parallelTestFrames, framesPerChild);
+    }
+    if (forceIalign)
+        extendToEndOfPattern(&nFrames, schedulerP->nextFrame,
+                             framePatternLen, schedulerP->numFramesInJob);
+
+    nFrames = MIN(nFrames, schedulerP->numFramesInJob - schedulerP->nextFrame);
+
+    *startFrameP = schedulerP->nextFrame;
+    *nFramesP = nFrames;
+    schedulerP->nextFrame += nFrames;
+}
+
+
+
+static float
+taperedGoalTime(struct childState const childState[],
+                unsigned int      const remainingFrameCount) {
+        
+    float        goalTime;
+    float        allChildrenFPS;
+    float        remainingJobTime;
+        /* How long we expect it to be before the whole movie is encoded*/
+    float        sum;
+    int          numMachinesToEstimate;
+    unsigned int childNum;
+    
+    /* frames left = lastFrameInStream - startFrame + 1 */
+    for (childNum = 0, sum = 0.0, numMachinesToEstimate = 0; 
+         childNum < numMachines; ++childNum) {
+        if (!childState[childNum].finished) {
+            if (childState[childNum].fps < 0.0 )
+                ++numMachinesToEstimate;
+            else
+                sum += childState[childNum].fps;
+        }
+    }
+    
+    allChildrenFPS = (float)numMachines *
+        (sum/(float)(numMachines-numMachinesToEstimate));
+    
+    remainingJobTime = (float)remainingFrameCount/allChildrenFPS;
+    
+    goalTime = MAX(5.0, remainingJobTime/2);
+
+    return goalTime;
+}
+
+
+
+static void
+allocateMoreFrames(struct scheduler * const schedulerP,
+                   unsigned int       const childNum,
+                   struct childState  const childState[],
+                   bool               const forceIalign,
+                   unsigned int       const framePatternLen,
+                   bool               const goalTimeSpecified,
+                   unsigned int       const goalTimeArg,
+                   unsigned int *     const startFrameP,
+                   unsigned int *     const nFramesP) {
+/*----------------------------------------------------------------------------
+   Decide which frames should be child 'childNum''s next assignment,
+   given the state/history of all children is childState[].
+
+   The lowest numbered frame which needs yet to be encoded is frame
+   number 'startFrame' and 'lastFrameInStream' is the highest.
+
+   The allocation always starts at the lowest numbered frame that
+   hasn't yet been allocated and is sequential.  We return as
+   *startFrameP the frame number of the first frame in the allocation
+   and as *nFramesP the number of frames.
+
+   If 'goalTimeSpecified' is true, we try to make the assignment take
+   'goalTimeArg' seconds.  If 'goalTimeSpecified' is not true, we choose
+   a goal time ourselves, which is based on how long we think it will
+   take for all the children to finish all the remaining frames.
+-----------------------------------------------------------------------------*/
+    float goalTime;
+        /* Number of seconds we want the assignment to take.  We size the
+           assignment to try to meet this goal.
+        */
+    unsigned int nFrames;
+    float avgFps;
+
+    if (!goalTimeSpecified) {
+        goalTime = taperedGoalTime(childState,
+                                   schedulerP->numFramesInJob - 
+                                   schedulerP->nextFrame);
+    
+        pm_message("MASTER: ASSIGNING %s %.2f seconds of work",
+                   machineName[childNum], goalTime);
+    } else
+        goalTime = goalTimeArg;
+    
+    if (childState[childNum].numSeconds != 0)
+        avgFps = (float)childState[childNum].numFrames / 
+            childState[childNum].numSeconds;
+    else
+        avgFps = 0.1;       /* arbitrary small value */
+
+    nFrames = MAX(1u, (unsigned int)(goalTime * avgFps + 0.5));
+    
+    nFrames = MIN(nFrames, 
+                  schedulerP->numFramesInJob - schedulerP->nextFrame);
+
+    if (forceIalign)
+        extendToEndOfPattern(&nFrames, schedulerP->nextFrame,
+                             framePatternLen, schedulerP->numFramesInJob);
+
+    *startFrameP = schedulerP->nextFrame;
+    *nFramesP = nFrames;
+    schedulerP->nextFrame += nFrames;
+}
+
+
+
+static void
+startChildren(struct scheduler *   const schedulerP,
+              const char *         const encoderName,
+              const char *         const masterHostName,
+              int                  const masterPortNum,
+              const char *         const paramFileName,
+              boolean              const parallelPerfect,
+              boolean              const forceIalign,
+              unsigned int         const framePatternLen,
+              unsigned int         const parallelTestFrames,
+              boolean              const beNice,
+              int                  const masterSocket,
+              int                  const combinePortNum,
+              int                  const decodePortNum,
+              int *                const ioPortNum,
+              unsigned int *       const numIoServersP,
+              struct childState ** const childStateP) {
+/*----------------------------------------------------------------------------
+   Start up the children.  Tell them to work for the master at
+   'masterHostName':'masterPortNum'.
+
+   Start I/O servers (as processes on this system) as required and return
+   the port numbers of the TCP ports on which they listen as
+   ioPortNum[] and the number of them as *numIoServersP.
+
+   Give each of the children some initial work to do.  This may be just
+   a small amount for timing purposes.
+
+   We access and manipulate the various global variables that represent
+   the state of the children, and the scheduler structure.
+-----------------------------------------------------------------------------*/
+    struct childState * childState;  /* malloc'ed */
+    unsigned int childNum;
+    unsigned int numIoServers;
+    unsigned int childrenLeftCurrentIoServer;
+        /* The number of additional children we can hook up to the
+           current I/O server before reaching our maximum children per
+           I/O server.  0 if there is no current I/O server.
+        */
+
+    MALLOCARRAY_NOFAIL(childState, schedulerP->numMachines);
+
+    childrenLeftCurrentIoServer = 0;  /* No current I/O server yet */
+    
+    numIoServers = 0;  /* None created yet */
+
+    for (childNum = 0; childNum < schedulerP->numMachines; ++childNum) {
+        char command[1024];
+        unsigned int startFrame;
+        unsigned int nFrames;
+
+        childState[childNum].fps        = -1.0;  /* illegal value as flag */
+        childState[childNum].numSeconds = 0;
+
+        allocateInitialFrames(schedulerP, parallelPerfect, forceIalign,
+                              framePatternLen, parallelTestFrames,
+                              childNum, &startFrame, &nFrames);
+
+        if (nFrames == 0) {
+            childState[childNum].finished = TRUE;
+            machineDebug("MASTER: No more frames; not starting child '%s'",
+                         machineName[childNum]);
+        } else {
+            childState[childNum].finished   = FALSE;
+        
+            if (remote[childNum]) {
+                if (childrenLeftCurrentIoServer == 0) {
+                    startIoServer(encoderName, schedulerP->numMachines, 
+                                  masterHostName, masterPortNum, masterSocket,
+                                  paramFileName, &ioPortNum[numIoServers++]);
+                    
+                    childrenLeftCurrentIoServer = SOMAXCONN;
+                }
+                --childrenLeftCurrentIoServer;
+            } 
+            snprintf(command, sizeof(command),
+                     "%s %s -l %s %s "
+                     "%s %s -child %s %d %d %d %d %d %d "
+                     "-frames %d %d %s",
+                     rsh,
+                     machineName[childNum], userName[childNum],
+                     beNice ? "nice" : "",
+                     executable[childNum],
+                     debugMachines ? "-debug_machines" : "",
+                     masterHostName, masterPortNum, 
+                     remote[childNum] ? ioPortNum[numIoServers-1] : 0,
+                     combinePortNum, decodePortNum, childNum,
+                     remote[childNum] ? 1 : 0,
+                     startFrame, startFrame + nFrames - 1,
+                     remote[childNum] ? 
+                         remoteParamFile[childNum] : paramFileName
+            );
+        
+            machineDebug("MASTER: Starting child server "
+                         "with shell command '%s'", command);
+
+            safe_fork(command);
+
+            machineDebug("MASTER: Frames %d-%d assigned to new child %s",
+                         startFrame, startFrame + nFrames - 1, 
+                         machineName[childNum]);
+        }
+        childState[childNum].startFrame = startFrame;
+        childState[childNum].lastNumFrames = nFrames;
+        childState[childNum].numFrames = childState[childNum].lastNumFrames;
+    }
+    *childStateP   = childState;
+    *numIoServersP = numIoServers;
+}
+
+
+
+static void
+noteFrameDone(const char * const combineHostName,
+              int          const combinePortNum,
+              unsigned int const frameStart, 
+              unsigned int const frameEnd) {
+/*----------------------------------------------------------------------------
+   Tell the Combine server that frames 'frameStart' through 'frameEnd'
+   are done.
+-----------------------------------------------------------------------------*/
+    int const negativeTwo = -2;
+    int clientSocket;
+    time_t  tempTimeStart, tempTimeEnd;
+    const char * error;
+    struct hostent * hostEntP;
+
+    time(&tempTimeStart);
+
+    hostEntP = NULL;
+
+    ConnectToSocket(combineHostName, combinePortNum, &hostEntP,
+                    &clientSocket, &error);
+    
+    if (error)
+        errorExit("MASTER: Can't connect to Combine server to tell it frames "
+                  "are done.  %s", error);
+
+    WriteInt(clientSocket, negativeTwo);
+
+    WriteInt(clientSocket, frameStart);
+
+    WriteInt(clientSocket, frameEnd);
+
+    close(clientSocket);
+
+    time(&tempTimeEnd);
+    IOtime += (tempTimeEnd-tempTimeStart);
+}
+
+
+
+static void
+feedTheChildren(struct scheduler * const schedulerP,
+                struct childState        childState[],
+                int                const masterSocket,
+                const char *       const combineHostName,
+                int                const combinePortNum,
+                bool               const forceIalign,
+                unsigned int       const framePatternLen,
+                bool               const goalTimeSpecified,
+                unsigned int       const goalTime) {
+/*----------------------------------------------------------------------------
+   Listen for children to tell us they have finished their assignments
+   and give them new assignments, until all the frames have been assigned
+   and all the children have finished.
+
+   As children finish assignments, inform the combine server at
+   'combineHostName':'combinePortNum' of such.
+
+   Note that the children got initial assigments when they were created.
+   So the first thing we do is wait for them to finish those.
+-----------------------------------------------------------------------------*/
+    unsigned int numFinished;
+        /* Number of child machines that have been excused because there
+           is no more work for them.
+        */
+    unsigned int framesDone;
+
+    numFinished = 0;
+    framesDone = 0;
+
+    while (numFinished != schedulerP->numMachines) {
+        int                 otherSock;
+        int                 childNum;
+        int                 seconds;
+        float               framesPerSecond;
+        struct childState * csP;
+        const char *        error;
+        unsigned int nextFrame;
+        unsigned int nFrames;
+
+        machineDebug("MASTER: Listening for a connection...");
+
+        AcceptConnection(masterSocket, &otherSock, &error);
+        if (error)
+            errorExit("MASTER SERVER: "
+                      "Failed to accept next connection.  %s", error);
+
+        ReadInt(otherSock, &childNum);
+        ReadInt(otherSock, &seconds);
+
+        csP = &childState[childNum];
+        
+        csP->numSeconds += seconds;
+        csP->fps = (float)csP->numFrames / (float)csP->numSeconds;
+
+        if (seconds != 0)
+            framesPerSecond = (float)csP->lastNumFrames / (float)seconds;
+        else
+            framesPerSecond = (float)csP->lastNumFrames * 2.0;
+
+        machineDebug("MASTER: Child %s FINISHED ASSIGNMENT.  "
+                     "%f frames per second", 
+                     machineName[childNum], framesPerSecond);
+
+        noteFrameDone(combineHostName, combinePortNum, csP->startFrame,
+                      csP->startFrame + csP->lastNumFrames - 1);
+
+        framesDone += csP->lastNumFrames;
+
+        allocateMoreFrames(schedulerP, childNum, childState,
+                           forceIalign, framePatternLen,
+                           goalTimeSpecified, goalTime,
+                           &nextFrame, &nFrames);
+
+        if (nFrames == 0) {
+            WriteInt(otherSock, -1);
+            WriteInt(otherSock, 0);
+
+            ++numFinished;
+
+            machineDebug("MASTER: NO MORE WORK FOR CHILD %s.  "
+                         "(%d of %d children now done)",
+                         machineName[childNum], numFinished, numMachines);
+        } else {
+            WriteInt(otherSock, nextFrame);
+            WriteInt(otherSock, nextFrame + nFrames - 1);
+
+            machineDebug("MASTER: Frames %d-%d assigned to child %s",
+                         nextFrame, nextFrame + nFrames - 1,
+                         machineName[childNum]);
+
+            csP->startFrame    = nextFrame;
+            csP->lastNumFrames = nFrames;
+            csP->numFrames    += csP->lastNumFrames;
+        }
+        close(otherSock);
+
+        machineDebug("MASTER: %d/%d DONE; %d ARE ASSIGNED",
+                     framesDone, schedulerP->numFramesInJob, 
+                     schedulerP->nextFrame - framesDone);
+    }
+}
+
+
+
+static void
+stopIoServers(const char * const hostName,
+              int          const ioPortNum[],
+              unsigned int const numIoServers) {
+
+    unsigned int childNum;
+
+    IOhostName = hostName;
+    for (childNum = 0; childNum < numIoServers; ++childNum) {
+        ioPortNumber = ioPortNum[childNum];
+        EndIOServer();
+    }
+}
+
+
+
+static void
+waitForCombineServerToTerminate(int const masterSocket) {
+
+    int otherSock;
+    const char * error;
+
+    machineDebug("MASTER SERVER: Waiting for combine server to terminate");
+
+    AcceptConnection(masterSocket, &otherSock, &error);
+    if (error)
+        errorExit("MASTER SERVER: "
+                  "Failed to accept connection expected from a "
+                  "terminating combine server.  %s", error);
+
+    {
+        int dummy;
+        ReadInt(otherSock, &dummy);
+    }
+    close(otherSock);
+}
+    
+
+
+static void
+printFinalStats(FILE *            const statfileP,
+                time_t            const startUpBegin,
+                time_t            const startUpEnd,
+                time_t            const shutDownBegin,
+                time_t            const shutDownEnd,
+                unsigned int      const numChildren,
+                struct childState const childState[],
+                unsigned int      const numFrames) {
+
+    unsigned int pass;
+    FILE * fileP;
+
+    for (pass = 0; pass < 2; ++pass) {
+        if (pass == 0)
+            fileP = stdout;
+        else
+            fileP = statfileP;
+
+        if (fileP) {
+            unsigned int childNum;
+            float totalFPS;
+
+            fprintf(fileP, "\n\n");
+            fprintf(fileP, "PARALLEL SUMMARY\n");
+            fprintf(fileP, "----------------\n");
+            fprintf(fileP, "\n");
+            fprintf(fileP, "START UP TIME:  %u seconds\n",
+                    (unsigned int)(startUpEnd - startUpBegin));
+            fprintf(fileP, "SHUT DOWN TIME:  %u seconds\n",
+                    (unsigned int)(shutDownEnd - shutDownBegin));
+            
+            fprintf(fileP, 
+                    "%14.14s %8.8s %8.8s %12.12s %9.9s\n",
+                    "MACHINE", "Frames", "Seconds", "Frames/Sec",
+                    "Self Time");
+
+            fprintf(fileP, 
+                    "%14.14s %8.8s %8.8s %12.12s %9.9s\n",
+                    "--------------", "--------", "--------", "------------",
+                    "---------");
+
+            totalFPS = 0.0;
+            for (childNum = 0; childNum < numChildren; ++childNum) {
+                float const localFPS = 
+                    (float)childState[childNum].numFrames /
+                    childState[childNum].numSeconds;
+                fprintf(fileP, "%14.14s %8u %8u %12.4f %8u\n",
+                        machineName[childNum], 
+                        childState[childNum].numFrames, 
+                        childState[childNum].numSeconds,
+                        localFPS, 
+                        (unsigned int)((float)numFrames/localFPS));
+                totalFPS += localFPS;
+            }
+
+            fprintf(fileP, 
+                    "%14.14s %8.8s %8.8s %12.12s %9.9s\n",
+                    "--------------", "--------", "--------", "------------",
+                    "---------");
+
+            fprintf(fileP, "%14s %8.8s %8u %12.4f\n", 
+                    "OPTIMAL", "", 
+                    (unsigned int)((float)numFrames/totalFPS),
+                    totalFPS);
+            
+            {
+                unsigned int const diffTime = shutDownEnd - startUpBegin;
+                
+                fprintf(fileP, "%14s %8.8s %8u %12.4f\n", 
+                        "ACTUAL", "", diffTime, 
+                        (float)numFrames / diffTime);
+            }
+            fprintf(fileP, "\n\n");
+        }
+    }
+}
+
+
+
+void
+MasterServer(struct inputSource * const inputSourceP,
+             const char *         const paramFileName, 
+             const char *         const outputFileName) {
+/*----------------------------------------------------------------------------
+   Execute the master server function.
+
+   Start all the other servers.
+-----------------------------------------------------------------------------*/
+    const char *hostName;
+    int       portNum;
+    int       masterSocket;
+        /* The file descriptor for the socket on which the master listens */
+    int ioPortNum[MAX_IO_SERVERS];
+    int       combinePortNum, decodePortNum;
+    struct childState * childState;  /* malloc'ed */
+    unsigned int numIoServers;
+    time_t  startUpBegin, startUpEnd;
+    time_t  shutDownBegin, shutDownEnd;
+    const char * error;
+    struct scheduler scheduler;
+
+    time(&startUpBegin);
+
+    scheduler.nextFrame = 0;
+    scheduler.numFramesInJob = inputSourceP->numInputFiles;
+    scheduler.numMachines = numMachines;
+
+    PrintStartStats(startUpBegin, FALSE, 0, 0, inputSourceP);
+
+    hostName = GetHostName();
+
+    hostEntry = gethostbyname(hostName);
+    if (hostEntry == NULL)
+        errorExit("Could not find host name '%s' in database", hostName);
+
+    CreateListeningSocket(&masterSocket, &portNum, &error);
+    if (error)
+        errorExit("Unable to create socket on which to listen.  %s", error);
+
+    if (debugSockets)
+        fprintf(stdout, "---MASTER USING PORT %d\n", portNum);
+
+    startCombineServer(encoder_name, numMachines, hostName, portNum,
+                       inputSourceP->numInputFiles, 
+                       paramFileName, masterSocket, 
+                       &combinePortNum);
+
+    if (referenceFrame == DECODED_FRAME)
+        startDecodeServer(encoder_name, numMachines, hostName, portNum,
+                          inputSourceP->numInputFiles, 
+                          paramFileName, masterSocket,
+                          &decodePortNum);
+
+    startChildren(&scheduler, encoder_name, hostName, portNum,
+                  paramFileName, parallelPerfect, forceIalign,
+                  framePatternLen, parallelTestFrames, 
+                  niceProcesses,
+                  masterSocket, combinePortNum, decodePortNum, 
+                  ioPortNum, &numIoServers,
+                  &childState);
+
+    time(&startUpEnd);
+
+    feedTheChildren(&scheduler, childState,
+                    masterSocket, hostName, combinePortNum,
+                    forceIalign, framePatternLen,
+                    parallelTimeChunks != -1, parallelTimeChunks);
+
+    assert(scheduler.nextFrame == scheduler.numFramesInJob);
+
+    time(&shutDownBegin);
+
+    stopIoServers(hostName, ioPortNum, numIoServers);
+
+    waitForCombineServerToTerminate(masterSocket);
+
+    close(masterSocket);
+
+    time(&shutDownEnd);
+
+    printFinalStats(statFile, startUpBegin, startUpEnd,
+                    shutDownBegin, shutDownEnd, numMachines,
+                    childState, inputSourceP->numInputFiles);
+
+    if (statFile)
+        fclose(statFile);
+
+    free(childState);
+    strfree(hostName);
+}
+
+
+
+void
+NotifyMasterDone(const char * const masterHostName, 
+                 int          const masterPortNum, 
+                 int          const childNum,
+                 unsigned int const seconds, 
+                 boolean *    const moreWorkToDoP,
+                 int *        const nextFrameStartP,
+                 int *        const nextFrameEndP) {
+/*----------------------------------------------------------------------------
+   Tell the master, at 'masterHostName':'masterPortNum' that child
+   number 'childNum' has finished its assignment, and the decoding
+   took 'seconds' wall clock seconds.  Get the next assignment, if
+   any, from the master.
+
+   If the master gives us a new assignment, return *moreWorkToDoP ==
+   TRUE and the frames the master wants us to do as *nextFrameStartP
+   and nextFrameEndP.  Otherwise (there is no more work for machine
+   'childNum' to do), return *moreWorkToDoP == FALSE.
+-----------------------------------------------------------------------------*/
+    int    clientSocket;
+    time_t tempTimeStart, tempTimeEnd;
+    const char * error;
+
+    machineDebug("CHILD: NOTIFYING MASTER Machine %d assignment complete", 
+                 childNum);
+
+    time(&tempTimeStart);
+    
+    ConnectToSocket(masterHostName, masterPortNum, &hostEntry,
+                    &clientSocket, &error);
+    if (error)
+        errorExit("CHILD: Can't connect to master to tell him we've finished "
+                  "our assignment.  %s", error);
+
+    WriteInt(clientSocket, childNum);
+    WriteInt(clientSocket, seconds);
+
+    ReadInt(clientSocket, nextFrameStartP);
+    ReadInt(clientSocket, nextFrameEndP);
+
+    *moreWorkToDoP = (*nextFrameStartP >= 0);
+
+    if (*moreWorkToDoP)
+        machineDebug("CHILD: Master says next assignment: start %d end %d",
+                     *nextFrameStartP, *nextFrameEndP);
+    else
+        machineDebug("CHILD: Master says no more work for us.");
+
+    close(clientSocket);
+
+    time(&tempTimeEnd);
+    IOtime += (tempTimeEnd-tempTimeStart);
+}
+
+
+
+void
+DecodeServer(int          const numInputFiles, 
+             const char * const decodeFileName, 
+             const char * const masterHostName, 
+             int          const masterPortNum) {
+/*----------------------------------------------------------------------------
+   Execute the decode server.
+
+   The decode server handles transfer of decoded frames to/from processes.
+
+   It is necessary only if referenceFrame == DECODED_FRAME.
+
+   Communicate to the master at hostname 'masterHostName':'masterPortNum'.
+
+-----------------------------------------------------------------------------*/
+    int     otherSock;
+    int     decodePortNum;
+    int     frameReady;
+    boolean *ready;
+    int     *waitMachine;
+    int     *waitPort;
+    int     *waitList;
+    int     slaveNumber;
+    int     slavePort;
+    int     waitPtr;
+    struct hostent *nullHost = NULL;
+    int     clientSocket;
+    const char * error;
+
+    /* should keep list of port numbers to notify when frames become ready */
+
+    ready = (boolean *) calloc(numInputFiles, sizeof(boolean));
+    waitMachine = (int *) calloc(numInputFiles, sizeof(int));
+    waitPort = (int *) malloc(numMachines*sizeof(int));
+    waitList = (int *) calloc(numMachines, sizeof(int));
+
+    CreateListeningSocket(&decodeServerSocket, &decodePortNum, &error);
+    if (error)
+        errorExit("Unable to create socket on which to listen.  %s", error);
+
+    machineDebug("DECODE SERVER LISTENING ON PORT %d", decodePortNum);
+
+    TransmitPortNum(masterHostName, masterPortNum, decodePortNum);
+
+    frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
+    memset((char *)frameDone, 0, numInputFiles*sizeof(boolean));
+
+    /* wait for ready signals and requests */
+    while ( TRUE ) {
+        const char * error;
+
+        AcceptConnection(decodeServerSocket, &otherSock, &error);
+        if (error)
+            errorExit("DECODE SERVER: "
+                      "Failed to accept next connection.  %s", error);
+
+        ReadInt(otherSock, &frameReady);
+
+        if ( frameReady == -2 ) {
+            ReadInt(otherSock, &frameReady);
+
+            machineDebug("DECODE SERVER:  REQUEST FOR FRAME %d", frameReady);
+
+            /* now respond if it's ready yet */
+            WriteInt(otherSock, frameDone[frameReady]);
+
+            if ( ! frameDone[frameReady] ) {
+                /* read machine number, port number */
+                ReadInt(otherSock, &slaveNumber);
+                ReadInt(otherSock, &slavePort);
+
+                machineDebug("DECODE SERVER: WAITING:  SLAVE %d, PORT %d",
+                             slaveNumber, slavePort);
+
+                waitPort[slaveNumber] = slavePort;
+                if ( waitMachine[frameReady] == 0 ) {
+                    waitMachine[frameReady] = slaveNumber+1;
+                } else {
+                    /* someone already waiting for this frame */
+                    /* follow list of waiters to the end */
+                    waitPtr = waitMachine[frameReady]-1;
+                    while ( waitList[waitPtr] != 0 ) {
+                        waitPtr = waitList[waitPtr]-1;
+                    }
+
+                    waitList[waitPtr] = slaveNumber+1;
+                    waitList[slaveNumber] = 0;
+                }
+            }
+        } else {
+            frameDone[frameReady] = TRUE;
+            
+            machineDebug("DECODE SERVER:  FRAME %d READY", frameReady);
+
+            if ( waitMachine[frameReady] ) {
+                /* need to notify one or more machines it's ready */
+                waitPtr = waitMachine[frameReady]-1;
+                while ( waitPtr >= 0 ) {
+                    const char * error;
+                    ConnectToSocket(machineName[waitPtr], waitPort[waitPtr],
+                                    &nullHost,
+                                    &clientSocket, &error);
+                    if (error)
+                        errorExit("DECODE SERVER: "
+                                  "Can't connect to child machine.  %s", 
+                                  error);
+                    close(clientSocket);
+                    waitPtr = waitList[waitPtr]-1;
+                }
+            }
+        }
+
+        close(otherSock);
+    }
+    
+    machineDebug("DECODE SERVER:  Shutting down");
+
+    /* tell Master server we are done */
+    TransmitPortNum(masterHostName, masterPortNum, decodePortNum);
+
+    close(decodeServerSocket);
+}
+
+
+
+/*
+ * Copyright (c) 1995 The Regents of the University of California.
+ * All rights reserved.
+ *
+ * Permission to use, copy, modify, and distribute this software and its
+ * documentation for any purpose, without fee, and without written agreement is
+ * hereby granted, provided that the above copyright notice and the following
+ * two paragraphs appear in all copies of this software.
+ *
+ * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
+ * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
+ * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
+ * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
+ * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+ * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
+ * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
+ * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
+ */