testProgs/playCommon.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // Copyright (c) 1996-2013, Live Networks, Inc.  All rights reserved
00017 // A common framework, used for the "openRTSP" and "playSIP" applications
00018 // Implementation
00019 //
00020 // NOTE: If you want to develop your own RTSP client application (or embed RTSP client functionality into your own application),
00021 // then we don't recommend using this code as a model, because it is too complex (with many options).
00022 // Instead, we recommend using the "testRTSPClient" application code as a model.
00023 
00024 #include "playCommon.hh"
00025 #include "BasicUsageEnvironment.hh"
00026 #include "GroupsockHelper.hh"
00027 
00028 #if defined(__WIN32__) || defined(_WIN32)
00029 #define snprintf _snprintf
00030 #else
00031 #include <signal.h>
00032 #define USE_SIGNALS 1
00033 #endif
00034 
00035 // Forward function definitions:
00036 void continueAfterOPTIONS(RTSPClient* client, int resultCode, char* resultString);
00037 void continueAfterDESCRIBE(RTSPClient* client, int resultCode, char* resultString);
00038 void continueAfterSETUP(RTSPClient* client, int resultCode, char* resultString);
00039 void continueAfterPLAY(RTSPClient* client, int resultCode, char* resultString);
00040 void continueAfterTEARDOWN(RTSPClient* client, int resultCode, char* resultString);
00041 
00042 void setupStreams();
00043 void closeMediaSinks();
00044 void subsessionAfterPlaying(void* clientData);
00045 void subsessionByeHandler(void* clientData);
00046 void sessionAfterPlaying(void* clientData = NULL);
00047 void sessionTimerHandler(void* clientData);
00048 void shutdown(int exitCode = 1);
00049 void signalHandlerShutdown(int sig);
00050 void checkForPacketArrival(void* clientData);
00051 void checkInterPacketGaps(void* clientData);
00052 void beginQOSMeasurement();
00053 
00054 char const* progName;
00055 UsageEnvironment* env;
00056 Medium* ourClient = NULL;
00057 Authenticator* ourAuthenticator = NULL;
00058 char const* streamURL = NULL;
00059 MediaSession* session = NULL;
00060 TaskToken sessionTimerTask = NULL;
00061 TaskToken arrivalCheckTimerTask = NULL;
00062 TaskToken interPacketGapCheckTimerTask = NULL;
00063 TaskToken qosMeasurementTimerTask = NULL;
00064 Boolean createReceivers = True;
00065 Boolean outputQuickTimeFile = False;
00066 Boolean generateMP4Format = False;
00067 QuickTimeFileSink* qtOut = NULL;
00068 Boolean outputAVIFile = False;
00069 AVIFileSink* aviOut = NULL;
00070 Boolean audioOnly = False;
00071 Boolean videoOnly = False;
00072 char const* singleMedium = NULL;
00073 int verbosityLevel = 1; // by default, print verbose output
00074 double duration = 0;
00075 double durationSlop = -1.0; // extra seconds to play at the end
00076 double initialSeekTime = 0.0f;
00077 char* initialAbsoluteSeekTime = NULL;
00078 float scale = 1.0f;
00079 double endTime;
00080 unsigned interPacketGapMaxTime = 0;
00081 unsigned totNumPacketsReceived = ~0; // used if checking inter-packet gaps
00082 Boolean playContinuously = False;
00083 int simpleRTPoffsetArg = -1;
00084 Boolean sendOptionsRequest = True;
00085 Boolean sendOptionsRequestOnly = False;
00086 Boolean oneFilePerFrame = False;
00087 Boolean notifyOnPacketArrival = False;
00088 Boolean streamUsingTCP = False;
00089 unsigned short desiredPortNum = 0;
00090 portNumBits tunnelOverHTTPPortNum = 0;
00091 char* username = NULL;
00092 char* password = NULL;
00093 char* proxyServerName = NULL;
00094 unsigned short proxyServerPortNum = 0;
00095 unsigned char desiredAudioRTPPayloadFormat = 0;
00096 char* mimeSubtype = NULL;
00097 unsigned short movieWidth = 240; // default
00098 Boolean movieWidthOptionSet = False;
00099 unsigned short movieHeight = 180; // default
00100 Boolean movieHeightOptionSet = False;
00101 unsigned movieFPS = 15; // default
00102 Boolean movieFPSOptionSet = False;
00103 char const* fileNamePrefix = "";
00104 unsigned fileSinkBufferSize = 100000;
00105 unsigned socketInputBufferSize = 0;
00106 Boolean packetLossCompensate = False;
00107 Boolean syncStreams = False;
00108 Boolean generateHintTracks = False;
00109 unsigned qosMeasurementIntervalMS = 0; // 0 means: Don't output QOS data
00110 
00111 struct timeval startTime;
00112 
00113 void usage() {
00114   *env << "Usage: " << progName
00115        << " [-p <startPortNum>] [-r|-q|-4|-i] [-a|-v] [-V] [-d <duration>] [-D <max-inter-packet-gap-time> [-c] [-S <offset>] [-n] [-O]"
00116            << (controlConnectionUsesTCP ? " [-t|-T <http-port>]" : "")
00117        << " [-u <username> <password>"
00118            << (allowProxyServers ? " [<proxy-server> [<proxy-server-port>]]" : "")
00119        << "]" << (supportCodecSelection ? " [-A <audio-codec-rtp-payload-format-code>|-M <mime-subtype-name>]" : "")
00120        << " [-s <initial-seek-time>]|[-U <absolute-seek-time>] [-z <scale>]"
00121        << " [-w <width> -h <height>] [-f <frames-per-second>] [-y] [-H] [-Q [<measurement-interval>]] [-F <filename-prefix>] [-b <file-sink-buffer-size>] [-B <input-socket-buffer-size>] [-I <input-interface-ip-address>] [-m] <url> (or " << progName << " -o [-V] <url>)\n";
00122   shutdown();
00123 }
00124 
00125 int main(int argc, char** argv) {
00126   // Begin by setting up our usage environment:
00127   TaskScheduler* scheduler = BasicTaskScheduler::createNew();
00128   env = BasicUsageEnvironment::createNew(*scheduler);
00129 
00130   progName = argv[0];
00131 
00132   gettimeofday(&startTime, NULL);
00133 
00134 #ifdef USE_SIGNALS
00135   // Allow ourselves to be shut down gracefully by a SIGHUP or a SIGUSR1:
00136   signal(SIGHUP, signalHandlerShutdown);
00137   signal(SIGUSR1, signalHandlerShutdown);
00138 #endif
00139 
00140   // unfortunately we can't use getopt() here, as Windoze doesn't have it
00141   while (argc > 2) {
00142     char* const opt = argv[1];
00143     if (opt[0] != '-') usage();
00144     switch (opt[1]) {
00145 
00146     case 'p': { // specify start port number
00147       int portArg;
00148       if (sscanf(argv[2], "%d", &portArg) != 1) {
00149         usage();
00150       }
00151       if (portArg <= 0 || portArg >= 65536 || portArg&1) {
00152         *env << "bad port number: " << portArg
00153                 << " (must be even, and in the range (0,65536))\n";
00154         usage();
00155       }
00156       desiredPortNum = (unsigned short)portArg;
00157       ++argv; --argc;
00158       break;
00159     }
00160 
00161     case 'r': { // do not receive data (instead, just 'play' the stream(s))
00162       createReceivers = False;
00163       break;
00164     }
00165 
00166     case 'q': { // output a QuickTime file (to stdout)
00167       outputQuickTimeFile = True;
00168       break;
00169     }
00170 
00171     case '4': { // output a 'mp4'-format file (to stdout)
00172       outputQuickTimeFile = True;
00173       generateMP4Format = True;
00174       break;
00175     }
00176 
00177     case 'i': { // output an AVI file (to stdout)
00178       outputAVIFile = True;
00179       break;
00180     }
00181 
00182     case 'I': { // specify input interface...
00183       NetAddressList addresses(argv[2]);
00184       if (addresses.numAddresses() == 0) {
00185         *env << "Failed to find network address for \"" << argv[2] << "\"";
00186         break;
00187       }
00188       ReceivingInterfaceAddr = *(unsigned*)(addresses.firstAddress()->data());
00189       ++argv; --argc;
00190       break;
00191     }
00192 
00193     case 'a': { // receive/record an audio stream only
00194       audioOnly = True;
00195       singleMedium = "audio";
00196       break;
00197     }
00198 
00199     case 'v': { // receive/record a video stream only
00200       videoOnly = True;
00201       singleMedium = "video";
00202       break;
00203     }
00204 
00205     case 'V': { // disable verbose output
00206       verbosityLevel = 0;
00207       break;
00208     }
00209 
00210     case 'd': { // specify duration, or how much to delay after end time
00211       float arg;
00212       if (sscanf(argv[2], "%g", &arg) != 1) {
00213         usage();
00214       }
00215       if (argv[2][0] == '-') { // not "arg<0", in case argv[2] was "-0"
00216         // a 'negative' argument was specified; use this for "durationSlop":
00217         duration = 0; // use whatever's in the SDP
00218         durationSlop = -arg;
00219       } else {
00220         duration = arg;
00221         durationSlop = 0;
00222       }
00223       ++argv; --argc;
00224       break;
00225     }
00226 
00227     case 'D': { // specify maximum number of seconds to wait for packets:
00228       if (sscanf(argv[2], "%u", &interPacketGapMaxTime) != 1) {
00229         usage();
00230       }
00231       ++argv; --argc;
00232       break;
00233     }
00234 
00235     case 'c': { // play continuously
00236       playContinuously = True;
00237       break;
00238     }
00239 
00240     case 'S': { // specify an offset to use with "SimpleRTPSource"s
00241       if (sscanf(argv[2], "%d", &simpleRTPoffsetArg) != 1) {
00242         usage();
00243       }
00244       if (simpleRTPoffsetArg < 0) {
00245         *env << "offset argument to \"-S\" must be >= 0\n";
00246         usage();
00247       }
00248       ++argv; --argc;
00249       break;
00250     }
00251 
00252     case 'O': { // Don't send an "OPTIONS" request before "DESCRIBE"
00253       sendOptionsRequest = False;
00254       break;
00255     }
00256 
00257     case 'o': { // Send only the "OPTIONS" request to the server
00258       sendOptionsRequestOnly = True;
00259       break;
00260     }
00261 
00262     case 'm': { // output multiple files - one for each frame
00263       oneFilePerFrame = True;
00264       break;
00265     }
00266 
00267     case 'n': { // notify the user when the first data packet arrives
00268       notifyOnPacketArrival = True;
00269       break;
00270     }
00271 
00272     case 't': {
00273       // stream RTP and RTCP over the TCP 'control' connection
00274       if (controlConnectionUsesTCP) {
00275         streamUsingTCP = True;
00276       } else {
00277         usage();
00278       }
00279       break;
00280     }
00281 
00282     case 'T': {
00283       // stream RTP and RTCP over a HTTP connection
00284       if (controlConnectionUsesTCP) {
00285         if (argc > 3 && argv[2][0] != '-') {
00286           // The next argument is the HTTP server port number:
00287           if (sscanf(argv[2], "%hu", &tunnelOverHTTPPortNum) == 1
00288               && tunnelOverHTTPPortNum > 0) {
00289             ++argv; --argc;
00290             break;
00291           }
00292         }
00293       }
00294 
00295       // If we get here, the option was specified incorrectly:
00296       usage();
00297       break;
00298     }
00299 
00300     case 'u': { // specify a username and password
00301       if (argc < 4) usage(); // there's no argv[3] (for the "password")
00302       username = argv[2];
00303       password = argv[3];
00304       argv+=2; argc-=2;
00305       if (allowProxyServers && argc > 3 && argv[2][0] != '-') {
00306         // The next argument is the name of a proxy server:
00307         proxyServerName = argv[2];
00308         ++argv; --argc;
00309 
00310         if (argc > 3 && argv[2][0] != '-') {
00311           // The next argument is the proxy server port number:
00312           if (sscanf(argv[2], "%hu", &proxyServerPortNum) != 1) {
00313             usage();
00314           }
00315           ++argv; --argc;
00316         }
00317       }
00318 
00319       ourAuthenticator = new Authenticator(username, password);
00320       break;
00321     }
00322 
00323     case 'A': { // specify a desired audio RTP payload format
00324       unsigned formatArg;
00325       if (sscanf(argv[2], "%u", &formatArg) != 1
00326           || formatArg >= 96) {
00327         usage();
00328       }
00329       desiredAudioRTPPayloadFormat = (unsigned char)formatArg;
00330       ++argv; --argc;
00331       break;
00332     }
00333 
00334     case 'M': { // specify a MIME subtype for a dynamic RTP payload type
00335       mimeSubtype = argv[2];
00336       if (desiredAudioRTPPayloadFormat==0) desiredAudioRTPPayloadFormat =96;
00337       ++argv; --argc;
00338       break;
00339     }
00340 
00341     case 'w': { // specify a width (pixels) for an output QuickTime or AVI movie
00342       if (sscanf(argv[2], "%hu", &movieWidth) != 1) {
00343         usage();
00344       }
00345       movieWidthOptionSet = True;
00346       ++argv; --argc;
00347       break;
00348     }
00349 
00350     case 'h': { // specify a height (pixels) for an output QuickTime or AVI movie
00351       if (sscanf(argv[2], "%hu", &movieHeight) != 1) {
00352         usage();
00353       }
00354       movieHeightOptionSet = True;
00355       ++argv; --argc;
00356       break;
00357     }
00358 
00359     case 'f': { // specify a frame rate (per second) for an output QT or AVI movie
00360       if (sscanf(argv[2], "%u", &movieFPS) != 1) {
00361         usage();
00362       }
00363       movieFPSOptionSet = True;
00364       ++argv; --argc;
00365       break;
00366     }
00367 
00368     case 'F': { // specify a prefix for the audio and video output files
00369       fileNamePrefix = argv[2];
00370       ++argv; --argc;
00371       break;
00372     }
00373 
00374     case 'b': { // specify the size of buffers for "FileSink"s
00375       if (sscanf(argv[2], "%u", &fileSinkBufferSize) != 1) {
00376         usage();
00377       }
00378       ++argv; --argc;
00379       break;
00380     }
00381 
00382     case 'B': { // specify the size of input socket buffers
00383       if (sscanf(argv[2], "%u", &socketInputBufferSize) != 1) {
00384         usage();
00385       }
00386       ++argv; --argc;
00387       break;
00388     }
00389 
00390     // Note: The following option is deprecated, and may someday be removed:
00391     case 'l': { // try to compensate for packet loss by repeating frames
00392       packetLossCompensate = True;
00393       break;
00394     }
00395 
00396     case 'y': { // synchronize audio and video streams
00397       syncStreams = True;
00398       break;
00399     }
00400 
00401     case 'H': { // generate hint tracks (as well as the regular data tracks)
00402       generateHintTracks = True;
00403       break;
00404     }
00405 
00406     case 'Q': { // output QOS measurements
00407       qosMeasurementIntervalMS = 1000; // default: 1 second
00408 
00409       if (argc > 3 && argv[2][0] != '-') {
00410         // The next argument is the measurement interval,
00411         // in multiples of 100 ms
00412         if (sscanf(argv[2], "%u", &qosMeasurementIntervalMS) != 1) {
00413           usage();
00414         }
00415         qosMeasurementIntervalMS *= 100;
00416         ++argv; --argc;
00417       }
00418       break;
00419     }
00420 
00421     case 's': { // specify initial seek time (trick play)
00422       double arg;
00423       if (sscanf(argv[2], "%lg", &arg) != 1 || arg < 0) {
00424         usage();
00425       }
00426       initialSeekTime = arg;
00427       ++argv; --argc;
00428       break;
00429     }
00430 
00431     case 'U': {
00432       // specify initial absolute seek time (trick play), using a string of the form "YYYYMMDDTHHMMSSZ" or "YYYYMMDDTHHMMSS.<frac>Z
00433       initialAbsoluteSeekTime = argv[2];
00434       ++argv; --argc;
00435       break;
00436     }
00437 
00438     case 'z': { // scale (trick play)
00439       float arg;
00440       if (sscanf(argv[2], "%g", &arg) != 1 || arg == 0.0f) {
00441         usage();
00442       }
00443       scale = arg;
00444       ++argv; --argc;
00445       break;
00446     }
00447 
00448     default: {
00449       usage();
00450       break;
00451     }
00452     }
00453 
00454     ++argv; --argc;
00455   }
00456   if (argc != 2) usage(); // there must be exactly one "rtsp://" URL at the end
00457   if (outputQuickTimeFile && outputAVIFile) {
00458     *env << "The -i and -q (or -4) options cannot both be used!\n";
00459     usage();
00460   }
00461   Boolean outputCompositeFile = outputQuickTimeFile || outputAVIFile;
00462   if (!createReceivers && outputCompositeFile) {
00463     *env << "The -r and -q (or -4 or -i) options cannot both be used!\n";
00464     usage();
00465   }
00466   if (outputCompositeFile && !movieWidthOptionSet) {
00467     *env << "Warning: The -q, -4 or -i option was used, but not -w.  Assuming a video width of "
00468          << movieWidth << " pixels\n";
00469   }
00470   if (outputCompositeFile && !movieHeightOptionSet) {
00471     *env << "Warning: The -q, -4 or -i option was used, but not -h.  Assuming a video height of "
00472          << movieHeight << " pixels\n";
00473   }
00474   if (outputCompositeFile && !movieFPSOptionSet) {
00475     *env << "Warning: The -q, -4 or -i option was used, but not -f.  Assuming a video frame rate of "
00476          << movieFPS << " frames-per-second\n";
00477   }
00478   if (audioOnly && videoOnly) {
00479     *env << "The -a and -v options cannot both be used!\n";
00480     usage();
00481   }
00482   if (sendOptionsRequestOnly && !sendOptionsRequest) {
00483     *env << "The -o and -O options cannot both be used!\n";
00484     usage();
00485   }
00486   if (initialAbsoluteSeekTime != NULL && initialSeekTime != 0.0f) {
00487     *env << "The -s and -U options cannot both be used!\n";
00488     usage();
00489   }
00490   if (tunnelOverHTTPPortNum > 0) {
00491     if (streamUsingTCP) {
00492       *env << "The -t and -T options cannot both be used!\n";
00493       usage();
00494     } else {
00495       streamUsingTCP = True;
00496     }
00497   }
00498   if (!createReceivers && notifyOnPacketArrival) {
00499     *env << "Warning: Because we're not receiving stream data, the -n flag has no effect\n";
00500   }
00501   if (durationSlop < 0) {
00502     // This parameter wasn't set, so use a default value.
00503     // If we're measuring QOS stats, then don't add any slop, to avoid
00504     // having 'empty' measurement intervals at the end.
00505     durationSlop = qosMeasurementIntervalMS > 0 ? 0.0 : 5.0;
00506   }
00507 
00508   streamURL = argv[1];
00509 
00510   // Create our client object:
00511   ourClient = createClient(*env, streamURL, verbosityLevel, progName);
00512   if (ourClient == NULL) {
00513     *env << "Failed to create " << clientProtocolName
00514                 << " client: " << env->getResultMsg() << "\n";
00515     shutdown();
00516   }
00517 
00518   if (sendOptionsRequest) {
00519     // Begin by sending an "OPTIONS" command:
00520     getOptions(continueAfterOPTIONS);
00521   } else {
00522     continueAfterOPTIONS(NULL, 0, NULL);
00523   }
00524 
00525   // All subsequent activity takes place within the event loop:
00526   env->taskScheduler().doEventLoop(); // does not return
00527 
00528   return 0; // only to prevent compiler warning
00529 }
00530 
00531 void continueAfterOPTIONS(RTSPClient*, int resultCode, char* resultString) {
00532   if (sendOptionsRequestOnly) {
00533     if (resultCode != 0) {
00534       *env << clientProtocolName << " \"OPTIONS\" request failed: " << resultString << "\n";
00535     } else {
00536       *env << clientProtocolName << " \"OPTIONS\" request returned: " << resultString << "\n";
00537     }
00538     shutdown();
00539   }
00540   delete[] resultString;
00541 
00542   // Next, get a SDP description for the stream:
00543   getSDPDescription(continueAfterDESCRIBE);
00544 }
00545 
00546 void continueAfterDESCRIBE(RTSPClient*, int resultCode, char* resultString) {
00547   if (resultCode != 0) {
00548     *env << "Failed to get a SDP description for the URL \"" << streamURL << "\": " << resultString << "\n";
00549     delete[] resultString;
00550     shutdown();
00551   }
00552 
00553   char* sdpDescription = resultString;
00554   *env << "Opened URL \"" << streamURL << "\", returning a SDP description:\n" << sdpDescription << "\n";
00555 
00556   // Create a media session object from this SDP description:
00557   session = MediaSession::createNew(*env, sdpDescription);
00558   delete[] sdpDescription;
00559   if (session == NULL) {
00560     *env << "Failed to create a MediaSession object from the SDP description: " << env->getResultMsg() << "\n";
00561     shutdown();
00562   } else if (!session->hasSubsessions()) {
00563     *env << "This session has no media subsessions (i.e., no \"m=\" lines)\n";
00564     shutdown();
00565   }
00566 
00567   // Then, setup the "RTPSource"s for the session:
00568   MediaSubsessionIterator iter(*session);
00569   MediaSubsession *subsession;
00570   Boolean madeProgress = False;
00571   char const* singleMediumToTest = singleMedium;
00572   while ((subsession = iter.next()) != NULL) {
00573     // If we've asked to receive only a single medium, then check this now:
00574     if (singleMediumToTest != NULL) {
00575       if (strcmp(subsession->mediumName(), singleMediumToTest) != 0) {
00576                   *env << "Ignoring \"" << subsession->mediumName()
00577                           << "/" << subsession->codecName()
00578                           << "\" subsession, because we've asked to receive a single " << singleMedium
00579                           << " session only\n";
00580         continue;
00581       } else {
00582         // Receive this subsession only
00583         singleMediumToTest = "xxxxx";
00584             // this hack ensures that we get only 1 subsession of this type
00585       }
00586     }
00587 
00588     if (desiredPortNum != 0) {
00589       subsession->setClientPortNum(desiredPortNum);
00590       desiredPortNum += 2;
00591     }
00592 
00593     if (createReceivers) {
00594       if (!subsession->initiate(simpleRTPoffsetArg)) {
00595         *env << "Unable to create receiver for \"" << subsession->mediumName()
00596              << "/" << subsession->codecName()
00597              << "\" subsession: " << env->getResultMsg() << "\n";
00598       } else {
00599         *env << "Created receiver for \"" << subsession->mediumName()
00600              << "/" << subsession->codecName()
00601              << "\" subsession (client ports " << subsession->clientPortNum()
00602              << "-" << subsession->clientPortNum()+1 << ")\n";
00603         madeProgress = True;
00604         
00605         if (subsession->rtpSource() != NULL) {
00606           // Because we're saving the incoming data, rather than playing
00607           // it in real time, allow an especially large time threshold
00608           // (1 second) for reordering misordered incoming packets:
00609           unsigned const thresh = 1000000; // 1 second
00610           subsession->rtpSource()->setPacketReorderingThresholdTime(thresh);
00611           
00612           // Set the RTP source's OS socket buffer size as appropriate - either if we were explicitly asked (using -B),
00613           // or if the desired FileSink buffer size happens to be larger than the current OS socket buffer size.
00614           // (The latter case is a heuristic, on the assumption that if the user asked for a large FileSink buffer size,
00615           // then the input data rate may be large enough to justify increasing the OS socket buffer size also.)
00616           int socketNum = subsession->rtpSource()->RTPgs()->socketNum();
00617           unsigned curBufferSize = getReceiveBufferSize(*env, socketNum);
00618           if (socketInputBufferSize > 0 || fileSinkBufferSize > curBufferSize) {
00619             unsigned newBufferSize = socketInputBufferSize > 0 ? socketInputBufferSize : fileSinkBufferSize;
00620             newBufferSize = setReceiveBufferTo(*env, socketNum, newBufferSize);
00621             if (socketInputBufferSize > 0) { // The user explicitly asked for the new socket buffer size; announce it:
00622               *env << "Changed socket receive buffer size for the \""
00623                    << subsession->mediumName()
00624                    << "/" << subsession->codecName()
00625                    << "\" subsession from "
00626                    << curBufferSize << " to "
00627                    << newBufferSize << " bytes\n";
00628             }
00629           }
00630         }
00631       }
00632     } else {
00633       if (subsession->clientPortNum() == 0) {
00634         *env << "No client port was specified for the \""
00635              << subsession->mediumName()
00636              << "/" << subsession->codecName()
00637              << "\" subsession.  (Try adding the \"-p <portNum>\" option.)\n";
00638       } else {
00639                 madeProgress = True;
00640       }
00641     }
00642   }
00643   if (!madeProgress) shutdown();
00644 
00645   // Perform additional 'setup' on each subsession, before playing them:
00646   setupStreams();
00647 }
00648 
00649 MediaSubsession *subsession;
00650 Boolean madeProgress = False;
00651 void continueAfterSETUP(RTSPClient*, int resultCode, char* resultString) {
00652   if (resultCode == 0) {
00653       *env << "Setup \"" << subsession->mediumName()
00654            << "/" << subsession->codecName()
00655            << "\" subsession (client ports " << subsession->clientPortNum()
00656            << "-" << subsession->clientPortNum()+1 << ")\n";
00657       madeProgress = True;
00658   } else {
00659     *env << "Failed to setup \"" << subsession->mediumName()
00660          << "/" << subsession->codecName()
00661          << "\" subsession: " << resultString << "\n";
00662   }
00663   delete[] resultString;
00664 
00665   // Set up the next subsession, if any:
00666   setupStreams();
00667 }
00668 
00669 void setupStreams() {
00670   static MediaSubsessionIterator* setupIter = NULL;
00671   if (setupIter == NULL) setupIter = new MediaSubsessionIterator(*session);
00672   while ((subsession = setupIter->next()) != NULL) {
00673     // We have another subsession left to set up:
00674     if (subsession->clientPortNum() == 0) continue; // port # was not set
00675 
00676     setupSubsession(subsession, streamUsingTCP, continueAfterSETUP);
00677     return;
00678   }
00679 
00680   // We're done setting up subsessions.
00681   delete setupIter;
00682   if (!madeProgress) shutdown();
00683 
00684   // Create output files:
00685   if (createReceivers) {
00686     if (outputQuickTimeFile) {
00687       // Create a "QuickTimeFileSink", to write to 'stdout':
00688       qtOut = QuickTimeFileSink::createNew(*env, *session, "stdout",
00689                                            fileSinkBufferSize,
00690                                            movieWidth, movieHeight,
00691                                            movieFPS,
00692                                            packetLossCompensate,
00693                                            syncStreams,
00694                                            generateHintTracks,
00695                                            generateMP4Format);
00696       if (qtOut == NULL) {
00697         *env << "Failed to create QuickTime file sink for stdout: " << env->getResultMsg();
00698         shutdown();
00699       }
00700 
00701       qtOut->startPlaying(sessionAfterPlaying, NULL);
00702     } else if (outputAVIFile) {
00703       // Create an "AVIFileSink", to write to 'stdout':
00704       aviOut = AVIFileSink::createNew(*env, *session, "stdout",
00705                                       fileSinkBufferSize,
00706                                       movieWidth, movieHeight,
00707                                       movieFPS,
00708                                       packetLossCompensate);
00709       if (aviOut == NULL) {
00710         *env << "Failed to create AVI file sink for stdout: " << env->getResultMsg();
00711         shutdown();
00712       }
00713 
00714       aviOut->startPlaying(sessionAfterPlaying, NULL);
00715     } else {
00716       // Create and start "FileSink"s for each subsession:
00717       madeProgress = False;
00718       MediaSubsessionIterator iter(*session);
00719       while ((subsession = iter.next()) != NULL) {
00720         if (subsession->readSource() == NULL) continue; // was not initiated
00721 
00722         // Create an output file for each desired stream:
00723         char outFileName[1000];
00724         if (singleMedium == NULL) {
00725           // Output file name is
00726           //     "<filename-prefix><medium_name>-<codec_name>-<counter>"
00727           static unsigned streamCounter = 0;
00728           snprintf(outFileName, sizeof outFileName, "%s%s-%s-%d",
00729                    fileNamePrefix, subsession->mediumName(),
00730                    subsession->codecName(), ++streamCounter);
00731         } else {
00732           sprintf(outFileName, "stdout");
00733         }
00734         FileSink* fileSink;
00735         if (strcmp(subsession->mediumName(), "audio") == 0 &&
00736             (strcmp(subsession->codecName(), "AMR") == 0 ||
00737              strcmp(subsession->codecName(), "AMR-WB") == 0)) {
00738           // For AMR audio streams, we use a special sink that inserts AMR frame hdrs:
00739           fileSink = AMRAudioFileSink::createNew(*env, outFileName,
00740                                                  fileSinkBufferSize, oneFilePerFrame);
00741         } else if (strcmp(subsession->mediumName(), "video") == 0 &&
00742             (strcmp(subsession->codecName(), "H264") == 0)) {
00743           // For H.264 video stream, we use a special sink that adds 'start codes', and (at the start) the SPS and PPS NAL units:
00744           fileSink = H264VideoFileSink::createNew(*env, outFileName,
00745                                                   subsession->fmtp_spropparametersets(),
00746                                                   fileSinkBufferSize, oneFilePerFrame);
00747         } else {
00748           // Normal case:
00749           fileSink = FileSink::createNew(*env, outFileName,
00750                                          fileSinkBufferSize, oneFilePerFrame);
00751         }
00752         subsession->sink = fileSink;
00753         if (subsession->sink == NULL) {
00754           *env << "Failed to create FileSink for \"" << outFileName
00755                   << "\": " << env->getResultMsg() << "\n";
00756         } else {
00757           if (singleMedium == NULL) {
00758             *env << "Created output file: \"" << outFileName << "\"\n";
00759           } else {
00760             *env << "Outputting data from the \"" << subsession->mediumName()
00761                         << "/" << subsession->codecName()
00762                         << "\" subsession to 'stdout'\n";
00763           }
00764 
00765           if (strcmp(subsession->mediumName(), "video") == 0 &&
00766               strcmp(subsession->codecName(), "MP4V-ES") == 0 &&
00767               subsession->fmtp_config() != NULL) {
00768             // For MPEG-4 video RTP streams, the 'config' information
00769             // from the SDP description contains useful VOL etc. headers.
00770             // Insert this data at the front of the output file:
00771             unsigned configLen;
00772             unsigned char* configData
00773               = parseGeneralConfigStr(subsession->fmtp_config(), configLen);
00774             struct timeval timeNow;
00775             gettimeofday(&timeNow, NULL);
00776             fileSink->addData(configData, configLen, timeNow);
00777             delete[] configData;
00778           }
00779 
00780           subsession->sink->startPlaying(*(subsession->readSource()),
00781                                          subsessionAfterPlaying,
00782                                          subsession);
00783 
00784           // Also set a handler to be called if a RTCP "BYE" arrives
00785           // for this subsession:
00786           if (subsession->rtcpInstance() != NULL) {
00787             subsession->rtcpInstance()->setByeHandler(subsessionByeHandler, subsession);
00788           }
00789 
00790           madeProgress = True;
00791         }
00792       }
00793       if (!madeProgress) shutdown();
00794     }
00795   }
00796 
00797   // Finally, start playing each subsession, to start the data flow:
00798   if (duration == 0) {
00799     if (scale > 0) duration = session->playEndTime() - initialSeekTime; // use SDP end time
00800     else if (scale < 0) duration = initialSeekTime;
00801   }
00802   if (duration < 0) duration = 0.0;
00803 
00804   endTime = initialSeekTime;
00805   if (scale > 0) {
00806     if (duration <= 0) endTime = -1.0f;
00807     else endTime = initialSeekTime + duration;
00808   } else {
00809     endTime = initialSeekTime - duration;
00810     if (endTime < 0) endTime = 0.0f;
00811   }
00812 
00813   char const* absStartTime = initialAbsoluteSeekTime != NULL ? initialAbsoluteSeekTime : session->absStartTime();
00814   if (absStartTime != NULL) {
00815     // Either we or the server have specified that seeking should be done by 'absolute' time:
00816     startPlayingSession(session, absStartTime, session->absEndTime(), scale, continueAfterPLAY);
00817   } else {
00818     // Normal case: Seek by relative time (NPT):
00819     startPlayingSession(session, initialSeekTime, endTime, scale, continueAfterPLAY);
00820   }
00821 }
00822 
00823 void continueAfterPLAY(RTSPClient*, int resultCode, char* resultString) {
00824   if (resultCode != 0) {
00825     *env << "Failed to start playing session: " << resultString << "\n";
00826     delete[] resultString;
00827     shutdown();
00828   } else {
00829     *env << "Started playing session\n";
00830   }
00831   delete[] resultString;
00832 
00833   if (qosMeasurementIntervalMS > 0) {
00834     // Begin periodic QOS measurements:
00835     beginQOSMeasurement();
00836   }
00837 
00838   // Figure out how long to delay (if at all) before shutting down, or
00839   // repeating the playing
00840   Boolean timerIsBeingUsed = False;
00841   double secondsToDelay = duration;
00842   if (duration > 0) {
00843     // First, adjust "duration" based on any change to the play range (that was specified in the "PLAY" response):
00844     double rangeAdjustment = (session->playEndTime() - session->playStartTime()) - (endTime - initialSeekTime);
00845     if (duration + rangeAdjustment > 0.0) duration += rangeAdjustment;
00846 
00847     timerIsBeingUsed = True;
00848     double absScale = scale > 0 ? scale : -scale; // ASSERT: scale != 0
00849     secondsToDelay = duration/absScale + durationSlop;
00850 
00851     int64_t uSecsToDelay = (int64_t)(secondsToDelay*1000000.0);
00852     sessionTimerTask = env->taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)sessionTimerHandler, (void*)NULL);
00853   }
00854 
00855   char const* actionString
00856     = createReceivers? "Receiving streamed data":"Data is being streamed";
00857   if (timerIsBeingUsed) {
00858     *env << actionString
00859                 << " (for up to " << secondsToDelay
00860                 << " seconds)...\n";
00861   } else {
00862 #ifdef USE_SIGNALS
00863     pid_t ourPid = getpid();
00864     *env << actionString
00865                 << " (signal with \"kill -HUP " << (int)ourPid
00866                 << "\" or \"kill -USR1 " << (int)ourPid
00867                 << "\" to terminate)...\n";
00868 #else
00869     *env << actionString << "...\n";
00870 #endif
00871   }
00872 
00873   // Watch for incoming packets (if desired):
00874   checkForPacketArrival(NULL);
00875   checkInterPacketGaps(NULL);
00876 }
00877 
00878 void closeMediaSinks() {
00879   Medium::close(qtOut);
00880   Medium::close(aviOut);
00881 
00882   if (session == NULL) return;
00883   MediaSubsessionIterator iter(*session);
00884   MediaSubsession* subsession;
00885   while ((subsession = iter.next()) != NULL) {
00886     Medium::close(subsession->sink);
00887     subsession->sink = NULL;
00888   }
00889 }
00890 
00891 void subsessionAfterPlaying(void* clientData) {
00892   // Begin by closing this media subsession's stream:
00893   MediaSubsession* subsession = (MediaSubsession*)clientData;
00894   Medium::close(subsession->sink);
00895   subsession->sink = NULL;
00896 
00897   // Next, check whether *all* subsessions' streams have now been closed:
00898   MediaSession& session = subsession->parentSession();
00899   MediaSubsessionIterator iter(session);
00900   while ((subsession = iter.next()) != NULL) {
00901     if (subsession->sink != NULL) return; // this subsession is still active
00902   }
00903 
00904   // All subsessions' streams have now been closed
00905   sessionAfterPlaying();
00906 }
00907 
00908 void subsessionByeHandler(void* clientData) {
00909   struct timeval timeNow;
00910   gettimeofday(&timeNow, NULL);
00911   unsigned secsDiff = timeNow.tv_sec - startTime.tv_sec;
00912 
00913   MediaSubsession* subsession = (MediaSubsession*)clientData;
00914   *env << "Received RTCP \"BYE\" on \"" << subsession->mediumName()
00915         << "/" << subsession->codecName()
00916         << "\" subsession (after " << secsDiff
00917         << " seconds)\n";
00918 
00919   // Act now as if the subsession had closed:
00920   subsessionAfterPlaying(subsession);
00921 }
00922 
00923 void sessionAfterPlaying(void* /*clientData*/) {
00924   if (!playContinuously) {
00925     shutdown(0);
00926   } else {
00927     // We've been asked to play the stream(s) over again.
00928     // First, reset state from the current session:
00929     if (env != NULL) {
00930       env->taskScheduler().unscheduleDelayedTask(sessionTimerTask);
00931       env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask);
00932       env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask);
00933       env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask);
00934     }
00935     totNumPacketsReceived = ~0;
00936 
00937     startPlayingSession(session, initialSeekTime, endTime, scale, continueAfterPLAY);
00938   }
00939 }
00940 
00941 void sessionTimerHandler(void* /*clientData*/) {
00942   sessionTimerTask = NULL;
00943 
00944   sessionAfterPlaying();
00945 }
00946 
00947 class qosMeasurementRecord {
00948 public:
00949   qosMeasurementRecord(struct timeval const& startTime, RTPSource* src)
00950     : fSource(src), fNext(NULL),
00951       kbits_per_second_min(1e20), kbits_per_second_max(0),
00952       kBytesTotal(0.0),
00953       packet_loss_fraction_min(1.0), packet_loss_fraction_max(0.0),
00954       totNumPacketsReceived(0), totNumPacketsExpected(0) {
00955     measurementEndTime = measurementStartTime = startTime;
00956 
00957     RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());
00958     // Assume that there's only one SSRC source (usually the case):
00959     RTPReceptionStats* stats = statsIter.next(True);
00960     if (stats != NULL) {
00961       kBytesTotal = stats->totNumKBytesReceived();
00962       totNumPacketsReceived = stats->totNumPacketsReceived();
00963       totNumPacketsExpected = stats->totNumPacketsExpected();
00964     }
00965   }
00966   virtual ~qosMeasurementRecord() { delete fNext; }
00967 
00968   void periodicQOSMeasurement(struct timeval const& timeNow);
00969 
00970 public:
00971   RTPSource* fSource;
00972   qosMeasurementRecord* fNext;
00973 
00974 public:
00975   struct timeval measurementStartTime, measurementEndTime;
00976   double kbits_per_second_min, kbits_per_second_max;
00977   double kBytesTotal;
00978   double packet_loss_fraction_min, packet_loss_fraction_max;
00979   unsigned totNumPacketsReceived, totNumPacketsExpected;
00980 };
00981 
00982 static qosMeasurementRecord* qosRecordHead = NULL;
00983 
00984 static void periodicQOSMeasurement(void* clientData); // forward
00985 
00986 static unsigned nextQOSMeasurementUSecs;
00987 
00988 static void scheduleNextQOSMeasurement() {
00989   nextQOSMeasurementUSecs += qosMeasurementIntervalMS*1000;
00990   struct timeval timeNow;
00991   gettimeofday(&timeNow, NULL);
00992   unsigned timeNowUSecs = timeNow.tv_sec*1000000 + timeNow.tv_usec;
00993   unsigned usecsToDelay = nextQOSMeasurementUSecs - timeNowUSecs;
00994      // Note: This works even when nextQOSMeasurementUSecs wraps around
00995 
00996   qosMeasurementTimerTask = env->taskScheduler().scheduleDelayedTask(
00997      usecsToDelay, (TaskFunc*)periodicQOSMeasurement, (void*)NULL);
00998 }
00999 
01000 static void periodicQOSMeasurement(void* /*clientData*/) {
01001   struct timeval timeNow;
01002   gettimeofday(&timeNow, NULL);
01003 
01004   for (qosMeasurementRecord* qosRecord = qosRecordHead;
01005        qosRecord != NULL; qosRecord = qosRecord->fNext) {
01006     qosRecord->periodicQOSMeasurement(timeNow);
01007   }
01008 
01009   // Do this again later:
01010   scheduleNextQOSMeasurement();
01011 }
01012 
01013 void qosMeasurementRecord
01014 ::periodicQOSMeasurement(struct timeval const& timeNow) {
01015   unsigned secsDiff = timeNow.tv_sec - measurementEndTime.tv_sec;
01016   int usecsDiff = timeNow.tv_usec - measurementEndTime.tv_usec;
01017   double timeDiff = secsDiff + usecsDiff/1000000.0;
01018   measurementEndTime = timeNow;
01019 
01020   RTPReceptionStatsDB::Iterator statsIter(fSource->receptionStatsDB());
01021   // Assume that there's only one SSRC source (usually the case):
01022   RTPReceptionStats* stats = statsIter.next(True);
01023   if (stats != NULL) {
01024     double kBytesTotalNow = stats->totNumKBytesReceived();
01025     double kBytesDeltaNow = kBytesTotalNow - kBytesTotal;
01026     kBytesTotal = kBytesTotalNow;
01027 
01028     double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff;
01029     if (kbpsNow < 0.0) kbpsNow = 0.0; // in case of roundoff error
01030     if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow;
01031     if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow;
01032 
01033     unsigned totReceivedNow = stats->totNumPacketsReceived();
01034     unsigned totExpectedNow = stats->totNumPacketsExpected();
01035     unsigned deltaReceivedNow = totReceivedNow - totNumPacketsReceived;
01036     unsigned deltaExpectedNow = totExpectedNow - totNumPacketsExpected;
01037     totNumPacketsReceived = totReceivedNow;
01038     totNumPacketsExpected = totExpectedNow;
01039 
01040     double lossFractionNow = deltaExpectedNow == 0 ? 0.0
01041       : 1.0 - deltaReceivedNow/(double)deltaExpectedNow;
01042     //if (lossFractionNow < 0.0) lossFractionNow = 0.0; //reordering can cause
01043     if (lossFractionNow < packet_loss_fraction_min) {
01044       packet_loss_fraction_min = lossFractionNow;
01045     }
01046     if (lossFractionNow > packet_loss_fraction_max) {
01047       packet_loss_fraction_max = lossFractionNow;
01048     }
01049   }
01050 }
01051 
01052 void beginQOSMeasurement() {
01053   // Set up a measurement record for each active subsession:
01054   struct timeval startTime;
01055   gettimeofday(&startTime, NULL);
01056   nextQOSMeasurementUSecs = startTime.tv_sec*1000000 + startTime.tv_usec;
01057   qosMeasurementRecord* qosRecordTail = NULL;
01058   MediaSubsessionIterator iter(*session);
01059   MediaSubsession* subsession;
01060   while ((subsession = iter.next()) != NULL) {
01061     RTPSource* src = subsession->rtpSource();
01062     if (src == NULL) continue;
01063 
01064     qosMeasurementRecord* qosRecord
01065       = new qosMeasurementRecord(startTime, src);
01066     if (qosRecordHead == NULL) qosRecordHead = qosRecord;
01067     if (qosRecordTail != NULL) qosRecordTail->fNext = qosRecord;
01068     qosRecordTail  = qosRecord;
01069   }
01070 
01071   // Then schedule the first of the periodic measurements:
01072   scheduleNextQOSMeasurement();
01073 }
01074 
01075 void printQOSData(int exitCode) {
01076   *env << "begin_QOS_statistics\n";
01077   
01078   // Print out stats for each active subsession:
01079   qosMeasurementRecord* curQOSRecord = qosRecordHead;
01080   if (session != NULL) {
01081     MediaSubsessionIterator iter(*session);
01082     MediaSubsession* subsession;
01083     while ((subsession = iter.next()) != NULL) {
01084       RTPSource* src = subsession->rtpSource();
01085       if (src == NULL) continue;
01086       
01087       *env << "subsession\t" << subsession->mediumName()
01088            << "/" << subsession->codecName() << "\n";
01089       
01090       unsigned numPacketsReceived = 0, numPacketsExpected = 0;
01091       
01092       if (curQOSRecord != NULL) {
01093         numPacketsReceived = curQOSRecord->totNumPacketsReceived;
01094         numPacketsExpected = curQOSRecord->totNumPacketsExpected;
01095       }
01096       *env << "num_packets_received\t" << numPacketsReceived << "\n";
01097       *env << "num_packets_lost\t" << int(numPacketsExpected - numPacketsReceived) << "\n";
01098       
01099       if (curQOSRecord != NULL) {
01100         unsigned secsDiff = curQOSRecord->measurementEndTime.tv_sec
01101           - curQOSRecord->measurementStartTime.tv_sec;
01102         int usecsDiff = curQOSRecord->measurementEndTime.tv_usec
01103           - curQOSRecord->measurementStartTime.tv_usec;
01104         double measurementTime = secsDiff + usecsDiff/1000000.0;
01105         *env << "elapsed_measurement_time\t" << measurementTime << "\n";
01106         
01107         *env << "kBytes_received_total\t" << curQOSRecord->kBytesTotal << "\n";
01108         
01109         *env << "measurement_sampling_interval_ms\t" << qosMeasurementIntervalMS << "\n";
01110         
01111         if (curQOSRecord->kbits_per_second_max == 0) {
01112           // special case: we didn't receive any data:
01113           *env <<
01114             "kbits_per_second_min\tunavailable\n"
01115             "kbits_per_second_ave\tunavailable\n"
01116             "kbits_per_second_max\tunavailable\n";
01117         } else {
01118           *env << "kbits_per_second_min\t" << curQOSRecord->kbits_per_second_min << "\n";
01119           *env << "kbits_per_second_ave\t"
01120                << (measurementTime == 0.0 ? 0.0 : 8*curQOSRecord->kBytesTotal/measurementTime) << "\n";
01121           *env << "kbits_per_second_max\t" << curQOSRecord->kbits_per_second_max << "\n";
01122         }
01123         
01124         *env << "packet_loss_percentage_min\t" << 100*curQOSRecord->packet_loss_fraction_min << "\n";
01125         double packetLossFraction = numPacketsExpected == 0 ? 1.0
01126           : 1.0 - numPacketsReceived/(double)numPacketsExpected;
01127         if (packetLossFraction < 0.0) packetLossFraction = 0.0;
01128         *env << "packet_loss_percentage_ave\t" << 100*packetLossFraction << "\n";
01129         *env << "packet_loss_percentage_max\t"
01130              << (packetLossFraction == 1.0 ? 100.0 : 100*curQOSRecord->packet_loss_fraction_max) << "\n";
01131         
01132         RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB());
01133         // Assume that there's only one SSRC source (usually the case):
01134         RTPReceptionStats* stats = statsIter.next(True);
01135         if (stats != NULL) {
01136           *env << "inter_packet_gap_ms_min\t" << stats->minInterPacketGapUS()/1000.0 << "\n";
01137           struct timeval totalGaps = stats->totalInterPacketGaps();
01138           double totalGapsMS = totalGaps.tv_sec*1000.0 + totalGaps.tv_usec/1000.0;
01139           unsigned totNumPacketsReceived = stats->totNumPacketsReceived();
01140           *env << "inter_packet_gap_ms_ave\t"
01141                << (totNumPacketsReceived == 0 ? 0.0 : totalGapsMS/totNumPacketsReceived) << "\n";
01142           *env << "inter_packet_gap_ms_max\t" << stats->maxInterPacketGapUS()/1000.0 << "\n";
01143         }
01144         
01145         curQOSRecord = curQOSRecord->fNext;
01146       }
01147     }
01148   }
01149 
01150   *env << "end_QOS_statistics\n";
01151   delete qosRecordHead;
01152 }
01153 
01154 Boolean areAlreadyShuttingDown = False;
01155 int shutdownExitCode;
01156 void shutdown(int exitCode) {
01157   if (areAlreadyShuttingDown) return; // in case we're called after receiving a RTCP "BYE" while in the middle of a "TEARDOWN".
01158   areAlreadyShuttingDown = True;
01159 
01160   shutdownExitCode = exitCode;
01161   if (env != NULL) {
01162     env->taskScheduler().unscheduleDelayedTask(sessionTimerTask);
01163     env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask);
01164     env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask);
01165     env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask);
01166   }
01167 
01168   if (qosMeasurementIntervalMS > 0) {
01169     printQOSData(exitCode);
01170   }
01171 
01172   // Teardown, then shutdown, any outstanding RTP/RTCP subsessions
01173   if (session != NULL) {
01174     tearDownSession(session, continueAfterTEARDOWN);
01175   } else {
01176     continueAfterTEARDOWN(NULL, 0, NULL);
01177   }
01178 }
01179 
01180 void continueAfterTEARDOWN(RTSPClient*, int /*resultCode*/, char* resultString) {
01181   delete[] resultString;
01182 
01183   // Now that we've stopped any more incoming data from arriving, close our output files:
01184   closeMediaSinks();
01185   Medium::close(session);
01186 
01187   // Finally, shut down our client:
01188   delete ourAuthenticator;
01189   Medium::close(ourClient);
01190 
01191   // Adios...
01192   exit(shutdownExitCode);
01193 }
01194 
01195 void signalHandlerShutdown(int /*sig*/) {
01196   *env << "Got shutdown signal\n";
01197   shutdown(0);
01198 }
01199 
01200 void checkForPacketArrival(void* /*clientData*/) {
01201   if (!notifyOnPacketArrival) return; // we're not checking
01202 
01203   // Check each subsession, to see whether it has received data packets:
01204   unsigned numSubsessionsChecked = 0;
01205   unsigned numSubsessionsWithReceivedData = 0;
01206   unsigned numSubsessionsThatHaveBeenSynced = 0;
01207 
01208   MediaSubsessionIterator iter(*session);
01209   MediaSubsession* subsession;
01210   while ((subsession = iter.next()) != NULL) {
01211     RTPSource* src = subsession->rtpSource();
01212     if (src == NULL) continue;
01213     ++numSubsessionsChecked;
01214 
01215     if (src->receptionStatsDB().numActiveSourcesSinceLastReset() > 0) {
01216       // At least one data packet has arrived
01217       ++numSubsessionsWithReceivedData;
01218     }
01219     if (src->hasBeenSynchronizedUsingRTCP()) {
01220       ++numSubsessionsThatHaveBeenSynced;
01221     }
01222   }
01223 
01224   unsigned numSubsessionsToCheck = numSubsessionsChecked;
01225   // Special case for "QuickTimeFileSink"s and "AVIFileSink"s:
01226   // They might not use all of the input sources:
01227   if (qtOut != NULL) {
01228     numSubsessionsToCheck = qtOut->numActiveSubsessions();
01229   } else if (aviOut != NULL) {
01230     numSubsessionsToCheck = aviOut->numActiveSubsessions();
01231   }
01232 
01233   Boolean notifyTheUser;
01234   if (!syncStreams) {
01235     notifyTheUser = numSubsessionsWithReceivedData > 0; // easy case
01236   } else {
01237     notifyTheUser = numSubsessionsWithReceivedData >= numSubsessionsToCheck
01238       && numSubsessionsThatHaveBeenSynced == numSubsessionsChecked;
01239     // Note: A subsession with no active sources is considered to be synced
01240   }
01241   if (notifyTheUser) {
01242     struct timeval timeNow;
01243     gettimeofday(&timeNow, NULL);
01244         char timestampStr[100];
01245         sprintf(timestampStr, "%ld%03ld", timeNow.tv_sec, (long)(timeNow.tv_usec/1000));
01246     *env << (syncStreams ? "Synchronized d" : "D")
01247                 << "ata packets have begun arriving [" << timestampStr << "]\007\n";
01248     return;
01249   }
01250 
01251   // No luck, so reschedule this check again, after a delay:
01252   int uSecsToDelay = 100000; // 100 ms
01253   arrivalCheckTimerTask
01254     = env->taskScheduler().scheduleDelayedTask(uSecsToDelay,
01255                                (TaskFunc*)checkForPacketArrival, NULL);
01256 }
01257 
01258 void checkInterPacketGaps(void* /*clientData*/) {
01259   if (interPacketGapMaxTime == 0) return; // we're not checking
01260 
01261   // Check each subsession, counting up how many packets have been received:
01262   unsigned newTotNumPacketsReceived = 0;
01263 
01264   MediaSubsessionIterator iter(*session);
01265   MediaSubsession* subsession;
01266   while ((subsession = iter.next()) != NULL) {
01267     RTPSource* src = subsession->rtpSource();
01268     if (src == NULL) continue;
01269     newTotNumPacketsReceived += src->receptionStatsDB().totNumPacketsReceived();
01270   }
01271 
01272   if (newTotNumPacketsReceived == totNumPacketsReceived) {
01273     // No additional packets have been received since the last time we
01274     // checked, so end this stream:
01275     *env << "Closing session, because we stopped receiving packets.\n";
01276     interPacketGapCheckTimerTask = NULL;
01277     sessionAfterPlaying();
01278   } else {
01279     totNumPacketsReceived = newTotNumPacketsReceived;
01280     // Check again, after the specified delay:
01281     interPacketGapCheckTimerTask
01282       = env->taskScheduler().scheduleDelayedTask(interPacketGapMaxTime*1000000,
01283                                  (TaskFunc*)checkInterPacketGaps, NULL);
01284   }
01285 }

Generated on Mon Apr 29 13:28:03 2013 for live by  doxygen 1.5.2