diff options
Diffstat (limited to 'converter/ppm/ppmtompeg/parallel.c')
-rw-r--r-- | converter/ppm/ppmtompeg/parallel.c | 379 |
1 files changed, 190 insertions, 189 deletions
diff --git a/converter/ppm/ppmtompeg/parallel.c b/converter/ppm/ppmtompeg/parallel.c index e3bcec1a..0fe635ed 100644 --- a/converter/ppm/ppmtompeg/parallel.c +++ b/converter/ppm/ppmtompeg/parallel.c @@ -1,8 +1,8 @@ /*===========================================================================* - * parallel.c - * - * Procedures to make encoder run in parallel - * + * parallel.c + * + * Procedures to make encoder run in parallel + * *===========================================================================*/ /* COPYRIGHT INFORMATION IS AT THE END OF THIS FILE */ @@ -13,8 +13,8 @@ *==============*/ #define _XOPEN_SOURCE 500 /* Make sure stdio.h contains pclose() */ -/* _ALL_SOURCE is needed on AIX to make the C library include the - socket services (e.g. define struct sockaddr) +/* _ALL_SOURCE is needed on AIX to make the C library include the + socket services (e.g. define struct sockaddr) Note that AIX standards.h actually sets feature declaration macros such as _XOPEN_SOURCE, unless they are already set. @@ -93,7 +93,8 @@ struct scheduler { * CONSTANTS * *==================*/ -#define TERMINATE_PID_SIGNAL SIGTERM /* signal used to terminate forked childs */ +#define TERMINATE_PID_SIGNAL SIGTERM + /* signal used to terminate forked children */ #ifndef MAXARGS #define MAXARGS 1024 /* Max Number of arguments in safe_fork command */ #endif @@ -187,8 +188,8 @@ errorExit(const char format[], ...) { static void -TransmitPortNum(const char * const hostName, - int const portNum, +TransmitPortNum(const char * const hostName, + int const portNum, int const newPortNum) { /*---------------------------------------------------------------------------- Transmit the port number 'newPortNum' to the master on port 'portNum' @@ -196,15 +197,15 @@ TransmitPortNum(const char * const hostName, -----------------------------------------------------------------------------*/ int clientSocket; const char * error; - + ConnectToSocket(hostName, portNum, &hostEntry, &clientSocket, &error); - + if (error) errorExit("Can't connect in order to transmit port number. %s", error); WriteInt(clientSocket, newPortNum); - + close(clientSocket); } @@ -215,19 +216,19 @@ readYUVDecoded(int const socketFd, unsigned int const Fsize_x, unsigned int const Fsize_y, MpegFrame * const frameP) { - + unsigned int y; - + for (y = 0; y < Fsize_y; ++y) /* Y */ - ReadBytes(socketFd, + ReadBytes(socketFd, (unsigned char *)frameP->decoded_y[y], Fsize_x); - + for (y = 0; y < (Fsize_y >> 1); ++y) /* U */ - ReadBytes(socketFd, + ReadBytes(socketFd, (unsigned char *)frameP->decoded_cb[y], (Fsize_x >> 1)); - + for (y = 0; y < (Fsize_y >> 1); ++y) /* V */ - ReadBytes(socketFd, + ReadBytes(socketFd, (unsigned char *)frameP->decoded_cr[y], (Fsize_x >> 1)); } @@ -238,19 +239,19 @@ writeYUVDecoded(int const socketFd, unsigned int const Fsize_x, unsigned int const Fsize_y, MpegFrame * const frameP) { - + unsigned int y; - + for (y = 0; y < Fsize_y; ++y) /* Y */ - WriteBytes(socketFd, + WriteBytes(socketFd, (unsigned char *)frameP->decoded_y[y], Fsize_x); - + for (y = 0; y < (Fsize_y >> 1); ++y) /* U */ - WriteBytes(socketFd, + WriteBytes(socketFd, (unsigned char *)frameP->decoded_cb[y], (Fsize_x >> 1)); - + for (y = 0; y < (Fsize_y >> 1); ++y) /* V */ - WriteBytes(socketFd, + WriteBytes(socketFd, (unsigned char *)frameP->decoded_cr[y], (Fsize_x >> 1)); } @@ -261,19 +262,19 @@ writeYUVOrig(int const socketFd, unsigned int const Fsize_x, unsigned int const Fsize_y, MpegFrame * const frameP) { - + unsigned int y; - + for (y = 0; y < Fsize_y; ++y) /* Y */ - WriteBytes(socketFd, + WriteBytes(socketFd, (unsigned char *)frameP->orig_y[y], Fsize_x); - + for (y = 0; y < (Fsize_y >> 1); ++y) /* U */ - WriteBytes(socketFd, + WriteBytes(socketFd, (unsigned char *)frameP->orig_cb[y], (Fsize_x >> 1)); - + for (y = 0; y < (Fsize_y >> 1); ++y) /* V */ - WriteBytes(socketFd, + WriteBytes(socketFd, (unsigned char *)frameP->orig_cr[y], (Fsize_x >> 1)); } @@ -284,19 +285,19 @@ readYUVOrig(int const socketFd, unsigned int const Fsize_x, unsigned int const Fsize_y, MpegFrame * const frameP) { - + unsigned int y; - + for (y = 0; y < Fsize_y; ++y) /* Y */ - ReadBytes(socketFd, + ReadBytes(socketFd, (unsigned char *)frameP->orig_y[y], Fsize_x); - + for (y = 0; y < (Fsize_y >> 1); ++y) /* U */ - ReadBytes(socketFd, + ReadBytes(socketFd, (unsigned char *)frameP->orig_cb[y], (Fsize_x >> 1)); - + for (y = 0; y < (Fsize_y >> 1); ++y) /* V */ - ReadBytes(socketFd, + ReadBytes(socketFd, (unsigned char *)frameP->orig_cr[y], (Fsize_x >> 1)); } @@ -340,20 +341,20 @@ NotifyDecodeServerReady(int const id) { int clientSocket; time_t tempTimeStart, tempTimeEnd; const char * error; - + time(&tempTimeStart); - + ConnectToSocket(IOhostName, decodePortNumber, &hostEntry, &clientSocket, &error); - + if (error) errorExit("CHILD: Can't connect to decode server to tell it a frame " "is ready. %s", error); - + WriteInt(clientSocket, id); - + close(clientSocket); - + time(&tempTimeEnd); IOtime += (tempTimeEnd-tempTimeStart); } @@ -461,23 +462,23 @@ SendDecodedFrame(MpegFrame * const frameP) { Send frame *frameP to the decode server. -----------------------------------------------------------------------------*/ int const negativeTwo = -2; - + int clientSocket; const char * error; - + /* send to IOServer */ ConnectToSocket(IOhostName, ioPortNumber, &hostEntry, &clientSocket, &error); if (error) errorExit("CHILD: Can't connect to decode server to " "give it a decoded frame. %s", error); - + WriteInt(clientSocket, negativeTwo); - + WriteInt(clientSocket, frameP->id); writeYUVDecoded(clientSocket, Fsize_x, Fsize_y, frameP); - + close(clientSocket); } @@ -490,11 +491,11 @@ SendDecodedFrame(MpegFrame * const frameP) { * * RETURNS: nothing * - * SIDE EFFECTS: + * SIDE EFFECTS: * *===========================================================================*/ void -GetRemoteDecodedRefFrame(MpegFrame * const frameP, +GetRemoteDecodedRefFrame(MpegFrame * const frameP, int const frameNumber) { /*---------------------------------------------------------------------------- Get decoded frame number 'frameNumber' *frameP from the decode server. @@ -551,7 +552,7 @@ static void cleanup_fork( dummy ) /* try to kill all child processes */ #endif if (kill(ClientPid[i], TERMINATE_PID_SIGNAL)) { - fprintf(stderr, "cleanup_fork: killed PID=%d failed (errno %d)\n", + fprintf(stderr, "cleanup_fork: killed PID=%d failed (errno %d)\n", ClientPid[i], errno); } } @@ -574,24 +575,24 @@ static int safe_fork(command) /* fork child process and remember its PID * static int init=0; char *argis[MAXARGS]; register int i=1; - + if (!(argis[0] = strtok(command, " \t"))) return(0); /* tokenize */ while ((argis[i] = strtok(NULL, " \t")) && i < MAXARGS) ++i; argis[i] = NULL; - + #ifdef DEBUG_FORK - {register int i=0; + {register int i=0; fprintf(stderr, "Command %s becomes:\n", command); while(argis[i]) {fprintf(stderr, "--%s--\n", argis[i]); ++i;} } #endif - + if (!init) { /* register clean-up routine */ signal (SIGQUIT, cleanup_fork); signal (SIGTERM, cleanup_fork); signal (SIGINT , cleanup_fork); init=1; } - + if (-1 == (ClientPid[current_max_forked_pid] = fork()) ) { perror("safe_fork: fork failed "); return(-1); @@ -682,25 +683,25 @@ decodedFrameToDisk(int const otherSock) { MpegFrame * frameP; ReadInt(otherSock, &frameNumber); - + if (debugSockets) { - fprintf(stdout, "INPUT SERVER: GETTING DECODED FRAME %d\n", + fprintf(stdout, "INPUT SERVER: GETTING DECODED FRAME %d\n", frameNumber); fflush(stdout); } /* should read frame from socket, then write to disk */ frameP = Frame_New(frameNumber, 'i'); - + Frame_AllocDecoded(frameP, TRUE); - + readYUVDecoded(otherSock, Fsize_x, Fsize_y, frameP); /* now output to disk */ WriteDecodedFrame(frameP); Frame_Free(frameP); -} +} @@ -708,12 +709,12 @@ static void decodedFrameFromDisk(int const otherSock) { /* request for decoded frame from disk */ - + int frameNumber; MpegFrame * frameP; ReadInt(otherSock, &frameNumber); - + if (debugSockets) { fprintf(stdout, "INPUT SERVER: READING DECODED FRAME %d " "from DISK\n", frameNumber); @@ -722,15 +723,15 @@ decodedFrameFromDisk(int const otherSock) { /* should read frame from disk, then write to socket */ frameP = Frame_New(frameNumber, 'i'); - + Frame_AllocDecoded(frameP, TRUE); - + ReadDecodedRefFrame(frameP, frameNumber); - + writeYUVDecoded(otherSock, Fsize_x, Fsize_y, frameP); - + Frame_Free(frameP); -} +} @@ -762,10 +763,10 @@ routeFromSocketToDisk(int const otherSock, MALLOCARRAY_NOFAIL(bigBuffer, bigBufferSize); } - + /* now read in the bytes */ ReadBytes(otherSock, bigBuffer, numBytes); - + /* open file to output this stuff to */ pm_asprintf(&fileName, "%s.frame.%d", outputFileName, frameNumber); filePtr = fopen(fileName, "wb"); @@ -777,9 +778,9 @@ routeFromSocketToDisk(int const otherSock, /* now write the bytes here */ fwrite(bigBuffer, sizeof(char), numBytes, filePtr); - + fclose(filePtr); - + if (debugSockets) { fprintf(stdout, "====I/O SERVER: WROTE FRAME %d to disk\n", frameNumber); @@ -788,7 +789,7 @@ routeFromSocketToDisk(int const otherSock, *bigBufferP = bigBuffer; *bigBufferSizeP = bigBufferSize; -} +} @@ -803,7 +804,7 @@ readConvertWriteToSocket(struct inputSource * const inputSourceP, the "base format" result to socket 'otherSock'. -----------------------------------------------------------------------------*/ FILE * convertedFileP; - + convertedFileP = ReadIOConvert(inputSourceP, frameNumber); if (convertedFileP) { bool eof; @@ -811,21 +812,21 @@ readConvertWriteToSocket(struct inputSource * const inputSourceP, while (!eof) { unsigned char buffer[1024]; unsigned int numBytes; - + numBytes = fread(buffer, 1, sizeof(buffer), convertedFileP); - + if (numBytes > 0) { WriteInt(otherSock, numBytes); WriteBytes(otherSock, buffer, numBytes); } else eof = TRUE; } - + if (strcmp(ioConversion, "*") == 0 ) fclose(convertedFileP); else pclose(convertedFileP); - + *endOfStreamP = FALSE; } else *endOfStreamP = TRUE; @@ -848,13 +849,13 @@ readWriteYuvToSocket(struct inputSource * const inputSourceP, MpegFrame * frameP; frameP = Frame_New(frameNumber, 'i'); - + ReadFrame(frameP, inputSourceP, frameNumber, inputConversion, endOfStreamP); - + if (!*endOfStreamP) { writeYUVOrig(otherSock, Fsize_x, Fsize_y, frameP); - + { /* Make sure we don't leave until other processor read everything @@ -907,7 +908,7 @@ processNextConnection(int const serverSocket, int otherSock; int command; const char * error; - + AcceptConnection(serverSocket, &otherSock, &error); if (error) errorExit("I/O SERVER: Failed to accept next connection. %s", error); @@ -948,12 +949,12 @@ processNextConnection(int const serverSocket, } close(otherSock); } - + void IoServer(struct inputSource * const inputSourceP, - const char * const parallelHostName, + const char * const parallelHostName, int const portNum) { /*---------------------------------------------------------------------------- Execute an I/O server. @@ -979,7 +980,7 @@ IoServer(struct inputSource * const inputSourceP, bigBufferSize = 0; /* Start with no buffer */ bigBuffer = NULL; - + /* once we get IO port num, should transmit it to parallel server */ CreateListeningSocket(&serverSocket, &ioPortNum, &error); @@ -1043,12 +1044,12 @@ SendRemoteFrame(int const frameNumber, BitBucket * const bb) { WriteInt(clientSocket, negativeFour); WriteInt(clientSocket, frameNumber); - + if (frameNumber != -1) { /* send number of bytes */ - + WriteInt(clientSocket, (bb->totalbits+7)>>3); - + /* now send the bytes themselves */ Bitio_WriteToSocket(bb, clientSocket); } @@ -1062,11 +1063,11 @@ SendRemoteFrame(int const frameNumber, BitBucket * const bb) { void -GetRemoteFrame(MpegFrame * const frameP, +GetRemoteFrame(MpegFrame * const frameP, int const frameNumber) { /*---------------------------------------------------------------------------- Get a frame from the I/O server. - + This is intended for use by a child. -----------------------------------------------------------------------------*/ int clientSocket; @@ -1084,7 +1085,7 @@ GetRemoteFrame(MpegFrame * const frameP, &clientSocket, &error); if (error) - errorExit("CHILD: Can't connect to I/O server to get a frame. %s", + errorExit("CHILD: Can't connect to I/O server to get a frame. %s", error); WriteInt(clientSocket, frameNumber); @@ -1104,7 +1105,7 @@ GetRemoteFrame(MpegFrame * const frameP, if (numBytes > sizeof(buffer)) errorExit("Invalid message received: numBytes = %d, " - "which is greater than %u", + "which is greater than %u", numBytes, (unsigned)sizeof(numBytes)); ReadBytes(clientSocket, buffer, numBytes); @@ -1162,9 +1163,9 @@ getAndProcessACombineConnection(int const outputServerSocket) { if (error) errorExit("COMBINE SERVER: " "Failed to accept next connection. %s", error); - + ReadInt(otherSock, &command); - + if (command == -2) { /* this is notification from non-remote process that a frame is done. @@ -1173,7 +1174,7 @@ getAndProcessACombineConnection(int const outputServerSocket) { ReadInt(otherSock, &frameStart); ReadInt(otherSock, &frameEnd); - + machineDebug("COMBINE_SERVER: Frames %d - %d done", frameStart, frameEnd); { @@ -1199,7 +1200,7 @@ openInputFile(const char * const fileName, FILE * inputFileP; unsigned int attempts; - + inputFileP = NULL; attempts = 0; @@ -1207,14 +1208,14 @@ openInputFile(const char * const fileName, inputFileP = fopen(fileName, "rb"); if (inputFileP == NULL) { pm_message("ERROR Couldn't read frame file '%s' errno = %d (%s)" - "attempt %d", + "attempt %d", fileName, errno, strerror(errno), attempts); pm_sleep(1000); } ++attempts; } if (inputFileP == NULL) - pm_error("Unable to open file '%s' after %d attempts.", + pm_error("Unable to open file '%s' after %d attempts.", fileName, attempts); *inputFilePP = inputFileP; @@ -1239,7 +1240,7 @@ waitForOutputFile(void * const inputHandle, const char * fileName; while (!frameDone[frameNumber]) { - machineDebug("COMBINE_SERVER: Waiting for frame %u done", + machineDebug("COMBINE_SERVER: Waiting for frame %u done", frameNumber); getAndProcessACombineConnection(outputServerSocket); @@ -1274,8 +1275,8 @@ unlinkFile(void * const inputHandle, void -CombineServer(int const numFrames, - const char * const masterHostName, +CombineServer(int const numFrames, + const char * const masterHostName, int const masterPortNum, const char * const outputFileName) { /*---------------------------------------------------------------------------- @@ -1287,17 +1288,17 @@ CombineServer(int const numFrames, FILE * ofP; const char * error; struct combineControl combineControl; - + /* once we get Combine port num, should transmit it to parallel server */ - + CreateListeningSocket(&outputServerSocket, &combinePortNum, &error); if (error) errorExit("Unable to create socket on which to listen. %s", error); machineDebug("COMBINE SERVER: LISTENING ON PORT %d", combinePortNum); - + TransmitPortNum(masterHostName, masterPortNum, combinePortNum); - + MALLOCARRAY_NOFAIL(frameDone, numFrames); { unsigned int i; @@ -1305,16 +1306,16 @@ CombineServer(int const numFrames, frameDone[i] = FALSE; } ofP = pm_openw(outputFileName); - + combineControl.numFrames = numFrames; FramesToMPEG(ofP, &combineControl, &waitForOutputFile, &unlinkFile); machineDebug("COMBINE SERVER: Shutting down"); - + /* tell Master server we are done */ TransmitPortNum(masterHostName, masterPortNum, combinePortNum); - + close(outputServerSocket); fclose(ofP); @@ -1340,18 +1341,18 @@ startCombineServer(const char * const encoderName, int otherSock; const char * error; - pm_snprintf(command, sizeof(command), + pm_snprintf(command, sizeof(command), "%s %s -max_machines %d -output_server %s %d %d %s", - encoderName, + encoderName, debugMachines ? "-debug_machines" : "", - numMachines, masterHostName, masterPortNum, + numMachines, masterHostName, masterPortNum, numInputFiles, paramFileName); - + machineDebug("MASTER: Starting combine server with shell command '%s'", command); safe_fork(command); - + machineDebug("MASTER: Listening for connection back from " "new Combine server"); @@ -1382,9 +1383,9 @@ startDecodeServer(const char * const encoderName, int otherSock; const char * error; - pm_snprintf(command, sizeof(command), + pm_snprintf(command, sizeof(command), "%s %s -max_machines %d -decode_server %s %d %d %s", - encoder_name, + encoder_name, debugMachines ? "-debug_machines" : "", numMachines, masterHostName, masterPortNum, numInputFiles, paramFileName); @@ -1404,9 +1405,9 @@ startDecodeServer(const char * const encoderName, "decode server. %s", error); ReadInt(otherSock, decodePortNumP); - + close(otherSock); - + machineDebug("MASTER: Decode port number = %d", *decodePortNumP); } @@ -1420,11 +1421,11 @@ startIoServer(const char * const encoderName, int const masterSocket, const char * const paramFileName, int * const ioPortNumP) { - + char command[1024]; int otherSock; const char * error; - + sprintf(command, "%s -max_machines %d -io_server %s %d %s", encoderName, numChildren, masterHostName, masterPortNum, paramFileName); @@ -1433,7 +1434,7 @@ startIoServer(const char * const encoderName, command); safe_fork(command); - + machineDebug("MASTER: Listening for connection back from " "new I/O server"); @@ -1442,14 +1443,14 @@ startIoServer(const char * const encoderName, errorExit("MASTER SERVER: " "Failed to accept connection back from the new " "I/O server. %s", error); - + ReadInt(otherSock, ioPortNumP); close(otherSock); - + machineDebug("MASTER: I/O port number = %d", *ioPortNumP); -} - - +} + + static void extendToEndOfPattern(unsigned int * const nFramesP, @@ -1458,7 +1459,7 @@ extendToEndOfPattern(unsigned int * const nFramesP, unsigned int const numFramesInStream) { assert(framePatternLen >= 1); - + while (startFrame + *nFramesP < numFramesInStream && (startFrame + *nFramesP) % framePatternLen != 0) ++(*nFramesP); @@ -1478,7 +1479,7 @@ allocateInitialFrames(struct scheduler * const schedulerP, /*---------------------------------------------------------------------------- Choose which frames, to hand out to the new child numbered 'childNum'. -----------------------------------------------------------------------------*/ - unsigned int const framesPerChild = + unsigned int const framesPerChild = MAX(1, ((schedulerP->numFramesInJob - schedulerP->nextFrame) / (schedulerP->numMachines - childNum))); @@ -1507,7 +1508,7 @@ allocateInitialFrames(struct scheduler * const schedulerP, static float taperedGoalTime(struct childState const childState[], unsigned int const remainingFrameCount) { - + float goalTime; float allChildrenFPS; float remainingJobTime; @@ -1515,9 +1516,9 @@ taperedGoalTime(struct childState const childState[], float sum; int numMachinesToEstimate; unsigned int childNum; - + /* frames left = lastFrameInStream - startFrame + 1 */ - for (childNum = 0, sum = 0.0, numMachinesToEstimate = 0; + for (childNum = 0, sum = 0.0, numMachinesToEstimate = 0; childNum < numMachines; ++childNum) { if (!childState[childNum].finished) { if (childState[childNum].fps < 0.0 ) @@ -1526,12 +1527,12 @@ taperedGoalTime(struct childState const childState[], sum += childState[childNum].fps; } } - + allChildrenFPS = (float)numMachines * (sum/(float)(numMachines-numMachinesToEstimate)); - + remainingJobTime = (float)remainingFrameCount/allChildrenFPS; - + goalTime = MAX(5.0, remainingJobTime/2); return goalTime; @@ -1575,23 +1576,23 @@ allocateMoreFrames(struct scheduler * const schedulerP, if (!goalTimeSpecified) { goalTime = taperedGoalTime(childState, - schedulerP->numFramesInJob - + schedulerP->numFramesInJob - schedulerP->nextFrame); - + pm_message("MASTER: ASSIGNING %s %.2f seconds of work", machineName[childNum], goalTime); } else goalTime = goalTimeArg; - + if (childState[childNum].numSeconds != 0) - avgFps = (float)childState[childNum].numFrames / + avgFps = (float)childState[childNum].numFrames / childState[childNum].numSeconds; else avgFps = 0.1; /* arbitrary small value */ nFrames = MAX(1u, (unsigned int)(goalTime * avgFps + 0.5)); - - nFrames = MIN(nFrames, + + nFrames = MIN(nFrames, schedulerP->numFramesInJob - schedulerP->nextFrame); if (forceIalign) @@ -1648,7 +1649,7 @@ startChildren(struct scheduler * const schedulerP, MALLOCARRAY_NOFAIL(childState, schedulerP->numMachines); childrenLeftCurrentIoServer = 0; /* No current I/O server yet */ - + numIoServers = 0; /* None created yet */ for (childNum = 0; childNum < schedulerP->numMachines; ++childNum) { @@ -1669,17 +1670,17 @@ startChildren(struct scheduler * const schedulerP, machineName[childNum]); } else { childState[childNum].finished = FALSE; - + if (remote[childNum]) { if (childrenLeftCurrentIoServer == 0) { - startIoServer(encoderName, schedulerP->numMachines, + startIoServer(encoderName, schedulerP->numMachines, masterHostName, masterPortNum, masterSocket, paramFileName, &ioPortNum[numIoServers++]); - + childrenLeftCurrentIoServer = SOMAXCONN; } --childrenLeftCurrentIoServer; - } + } pm_snprintf(command, sizeof(command), "%s %s -l %s %s " "%s %s -child %s %d %d %d %d %d %d " @@ -1689,22 +1690,22 @@ startChildren(struct scheduler * const schedulerP, beNice ? "nice" : "", executable[childNum], debugMachines ? "-debug_machines" : "", - masterHostName, masterPortNum, + masterHostName, masterPortNum, remote[childNum] ? ioPortNum[numIoServers-1] : 0, combinePortNum, decodePortNum, childNum, remote[childNum] ? 1 : 0, startFrame, startFrame + nFrames - 1, - remote[childNum] ? + remote[childNum] ? remoteParamFile[childNum] : paramFileName ); - + machineDebug("MASTER: Starting child server " "with shell command '%s'", command); safe_fork(command); machineDebug("MASTER: Frames %d-%d assigned to new child %s", - startFrame, startFrame + nFrames - 1, + startFrame, startFrame + nFrames - 1, machineName[childNum]); } childState[childNum].startFrame = startFrame; @@ -1720,7 +1721,7 @@ startChildren(struct scheduler * const schedulerP, static void noteFrameDone(const char * const combineHostName, int const combinePortNum, - unsigned int const frameStart, + unsigned int const frameStart, unsigned int const frameEnd) { /*---------------------------------------------------------------------------- Tell the Combine server that frames 'frameStart' through 'frameEnd' @@ -1738,7 +1739,7 @@ noteFrameDone(const char * const combineHostName, ConnectToSocket(combineHostName, combinePortNum, &hostEntP, &clientSocket, &error); - + if (error) errorExit("MASTER: Can't connect to Combine server to tell it frames " "are done. %s", error); @@ -1775,7 +1776,7 @@ feedTheChildren(struct scheduler * const schedulerP, As children finish assignments, inform the combine server at 'combineHostName':'combinePortNum' of such. - Note that the children got initial assigments when they were created. + Note that the children got initial assignments when they were created. So the first thing we do is wait for them to finish those. -----------------------------------------------------------------------------*/ unsigned int numFinished; @@ -1808,7 +1809,7 @@ feedTheChildren(struct scheduler * const schedulerP, ReadInt(otherSock, &seconds); csP = &childState[childNum]; - + csP->numSeconds += seconds; csP->fps = (float)csP->numFrames / (float)csP->numSeconds; @@ -1818,7 +1819,7 @@ feedTheChildren(struct scheduler * const schedulerP, framesPerSecond = (float)csP->lastNumFrames * 2.0; machineDebug("MASTER: Child %s FINISHED ASSIGNMENT. " - "%f frames per second", + "%f frames per second", machineName[childNum], framesPerSecond); noteFrameDone(combineHostName, combinePortNum, csP->startFrame, @@ -1855,7 +1856,7 @@ feedTheChildren(struct scheduler * const schedulerP, close(otherSock); machineDebug("MASTER: %d/%d DONE; %d ARE ASSIGNED", - framesDone, schedulerP->numFramesInJob, + framesDone, schedulerP->numFramesInJob, schedulerP->nextFrame - framesDone); } } @@ -1898,7 +1899,7 @@ waitForCombineServerToTerminate(int const masterSocket) { } close(otherSock); } - + static void @@ -1932,46 +1933,46 @@ printFinalStats(FILE * const statfileP, (unsigned int)(startUpEnd - startUpBegin)); fprintf(fileP, "SHUT DOWN TIME: %u seconds\n", (unsigned int)(shutDownEnd - shutDownBegin)); - - fprintf(fileP, + + fprintf(fileP, "%14.14s %8.8s %8.8s %12.12s %9.9s\n", "MACHINE", "Frames", "Seconds", "Frames/Sec", "Self Time"); - fprintf(fileP, + fprintf(fileP, "%14.14s %8.8s %8.8s %12.12s %9.9s\n", "--------------", "--------", "--------", "------------", "---------"); totalFPS = 0.0; for (childNum = 0; childNum < numChildren; ++childNum) { - float const localFPS = + float const localFPS = (float)childState[childNum].numFrames / childState[childNum].numSeconds; fprintf(fileP, "%14.14s %8u %8u %12.4f %8u\n", - machineName[childNum], - childState[childNum].numFrames, + machineName[childNum], + childState[childNum].numFrames, childState[childNum].numSeconds, - localFPS, + localFPS, (unsigned int)((float)numFrames/localFPS)); totalFPS += localFPS; } - fprintf(fileP, + fprintf(fileP, "%14.14s %8.8s %8.8s %12.12s %9.9s\n", "--------------", "--------", "--------", "------------", "---------"); - fprintf(fileP, "%14s %8.8s %8u %12.4f\n", - "OPTIMAL", "", + fprintf(fileP, "%14s %8.8s %8u %12.4f\n", + "OPTIMAL", "", (unsigned int)((float)numFrames/totalFPS), totalFPS); - + { unsigned int const diffTime = shutDownEnd - startUpBegin; - - fprintf(fileP, "%14s %8.8s %8u %12.4f\n", - "ACTUAL", "", diffTime, + + fprintf(fileP, "%14s %8.8s %8u %12.4f\n", + "ACTUAL", "", diffTime, (float)numFrames / diffTime); } fprintf(fileP, "\n\n"); @@ -1983,7 +1984,7 @@ printFinalStats(FILE * const statfileP, void MasterServer(struct inputSource * const inputSourceP, - const char * const paramFileName, + const char * const paramFileName, const char * const outputFileName) { /*---------------------------------------------------------------------------- Execute the master server function. @@ -2025,21 +2026,21 @@ MasterServer(struct inputSource * const inputSourceP, fprintf(stdout, "---MASTER USING PORT %d\n", portNum); startCombineServer(encoder_name, numMachines, hostName, portNum, - inputSourceP->numInputFiles, - paramFileName, masterSocket, + inputSourceP->numInputFiles, + paramFileName, masterSocket, &combinePortNum); if (referenceFrame == DECODED_FRAME) startDecodeServer(encoder_name, numMachines, hostName, portNum, - inputSourceP->numInputFiles, + inputSourceP->numInputFiles, paramFileName, masterSocket, &decodePortNum); startChildren(&scheduler, encoder_name, hostName, portNum, paramFileName, parallelPerfect, forceIalign, - framePatternLen, parallelTestFrames, + framePatternLen, parallelTestFrames, niceProcesses, - masterSocket, combinePortNum, decodePortNum, + masterSocket, combinePortNum, decodePortNum, ioPortNum, &numIoServers, &childState); @@ -2076,10 +2077,10 @@ MasterServer(struct inputSource * const inputSourceP, void -NotifyMasterDone(const char * const masterHostName, - int const masterPortNum, +NotifyMasterDone(const char * const masterHostName, + int const masterPortNum, int const childNum, - unsigned int const seconds, + unsigned int const seconds, boolean * const moreWorkToDoP, int * const nextFrameStartP, int * const nextFrameEndP) { @@ -2098,11 +2099,11 @@ NotifyMasterDone(const char * const masterHostName, time_t tempTimeStart, tempTimeEnd; const char * error; - machineDebug("CHILD: NOTIFYING MASTER Machine %d assignment complete", + machineDebug("CHILD: NOTIFYING MASTER Machine %d assignment complete", childNum); time(&tempTimeStart); - + ConnectToSocket(masterHostName, masterPortNum, &hostEntry, &clientSocket, &error); if (error) @@ -2132,9 +2133,9 @@ NotifyMasterDone(const char * const masterHostName, void -DecodeServer(int const numInputFiles, - const char * const decodeFileName, - const char * const masterHostName, +DecodeServer(int const numInputFiles, + const char * const decodeFileName, + const char * const masterHostName, int const masterPortNum) { /*---------------------------------------------------------------------------- Execute the decode server. @@ -2220,7 +2221,7 @@ DecodeServer(int const numInputFiles, } } else { frameDone[frameReady] = TRUE; - + machineDebug("DECODE SERVER: FRAME %d READY", frameReady); if ( waitMachine[frameReady] ) { @@ -2233,7 +2234,7 @@ DecodeServer(int const numInputFiles, &clientSocket, &error); if (error) errorExit("DECODE SERVER: " - "Can't connect to child machine. %s", + "Can't connect to child machine. %s", error); close(clientSocket); waitPtr = waitList[waitPtr]-1; @@ -2243,7 +2244,7 @@ DecodeServer(int const numInputFiles, close(otherSock); } - + machineDebug("DECODE SERVER: Shutting down"); /* tell Master server we are done */ |