liveMedia/ProxyServerMediaSession.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2013 Live Networks, Inc.  All rights reserved.
00018 // A subclass of "ServerMediaSession" that can be used to create a (unicast) RTSP servers that acts as a 'proxy' for
00019 // another (unicast or multicast) RTSP/RTP stream.
00020 // Implementation
00021 
00022 #include "liveMedia.hh"
00023 #include "RTSPCommon.hh"
00024 #include "GroupsockHelper.hh" // for "our_random()"
00025 
00026 #ifndef MILLION
00027 #define MILLION 1000000
00028 #endif
00029 
00030 // A "OnDemandServerMediaSubsession" subclass, used to implement a unicast RTSP server that's proxying another RTSP stream:
00031 
00032 class ProxyServerMediaSubsession: public OnDemandServerMediaSubsession {
00033 public:
00034   ProxyServerMediaSubsession(MediaSubsession& mediaSubsession);
00035   virtual ~ProxyServerMediaSubsession();
00036 
00037   char const* codecName() const { return fClientMediaSubsession.codecName(); }
00038 
00039 private: // redefined virtual functions
00040   virtual FramedSource* createNewStreamSource(unsigned clientSessionId,
00041                                               unsigned& estBitrate);
00042   virtual void closeStreamSource(FramedSource *inputSource);
00043   virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock,
00044                                     unsigned char rtpPayloadTypeIfDynamic,
00045                                     FramedSource* inputSource);
00046 
00047 private:
00048   static void subsessionByeHandler(void* clientData);
00049   void subsessionByeHandler();
00050 
00051   int verbosityLevel() const { return ((ProxyServerMediaSession*)fParentSession)->fVerbosityLevel; }
00052 
00053 private:
00054   friend class ProxyRTSPClient;
00055   MediaSubsession& fClientMediaSubsession; // the 'client' media subsession object that corresponds to this 'server' media subsession
00056   ProxyServerMediaSubsession* fNext; // used when we're part of a queue
00057   Boolean fHaveSetupStream;
00058 };
00059 
00060 
00062 
00063 UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSession& psms) { // used for debugging
00064   return env << "ProxyServerMediaSession[\"" << psms.url() << "\"]";
00065 }
00066 
00067 ProxyServerMediaSession* ProxyServerMediaSession
00068 ::createNew(UsageEnvironment& env, RTSPServer* ourRTSPServer,
00069             char const* inputStreamURL, char const* streamName,
00070             char const* username, char const* password,
00071             portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer) {
00072   return new ProxyServerMediaSession(env, ourRTSPServer, inputStreamURL, streamName, username, password,
00073                                      tunnelOverHTTPPortNum, verbosityLevel, socketNumToServer);
00074 }
00075 
00076 
00077 ProxyServerMediaSession::ProxyServerMediaSession(UsageEnvironment& env, RTSPServer* ourRTSPServer,
00078                                                  char const* inputStreamURL, char const* streamName,
00079                                                  char const* username, char const* password,
00080                                                  portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer)
00081   : ServerMediaSession(env, streamName, NULL, NULL, False, NULL),
00082     describeCompletedFlag(0), fOurRTSPServer(ourRTSPServer), fClientMediaSession(NULL),
00083     fVerbosityLevel(verbosityLevel), fPresentationTimeSessionNormalizer(new PresentationTimeSessionNormalizer(envir())) {
00084   // Open a RTSP connection to the input stream, and send a "DESCRIBE" command.
00085   // We'll use the SDP description in the response to set ourselves up.
00086   fProxyRTSPClient = createNewProxyRTSPClient(inputStreamURL, username, password, tunnelOverHTTPPortNum,
00087                                               verbosityLevel > 0 ? verbosityLevel-1 : verbosityLevel, socketNumToServer);
00088   ProxyRTSPClient::sendDESCRIBE(fProxyRTSPClient);
00089 }
00090 
00091 ProxyServerMediaSession::~ProxyServerMediaSession() {
00092   if (fVerbosityLevel > 0) {
00093     envir() << *this << "::~ProxyServerMediaSession()\n";
00094   }
00095 
00096   // Begin by sending a "TEARDOWN" command (without checking for a response):
00097   if (fProxyRTSPClient != NULL) fProxyRTSPClient->sendTeardownCommand(*fClientMediaSession, NULL, fProxyRTSPClient->auth());
00098 
00099   // Then delete our state:
00100   Medium::close(fClientMediaSession);
00101   Medium::close(fProxyRTSPClient);
00102   delete fPresentationTimeSessionNormalizer;
00103 }
00104 
00105 char const* ProxyServerMediaSession::url() const {
00106   return fProxyRTSPClient == NULL ? NULL : fProxyRTSPClient->url();
00107 }
00108 
00109 ProxyRTSPClient* ProxyServerMediaSession
00110 ::createNewProxyRTSPClient(char const* rtspURL, char const* username, char const* password,
00111                            portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer){
00112   // default implementation:
00113   return new ProxyRTSPClient(*this, rtspURL, username, password, tunnelOverHTTPPortNum, verbosityLevel, socketNumToServer);
00114 }
00115 
00116 void ProxyServerMediaSession::continueAfterDESCRIBE(char const* sdpDescription) {
00117   describeCompletedFlag = 1;
00118 
00119   // Create a (client) "MediaSession" object from the stream's SDP description ("resultString"), then iterate through its
00120   // "MediaSubsession" objects, to set up corresponding "ServerMediaSubsession" objects that we'll use to serve the stream's tracks.
00121   do {
00122     fClientMediaSession = MediaSession::createNew(envir(), sdpDescription);
00123     if (fClientMediaSession == NULL) break;
00124 
00125     MediaSubsessionIterator iter(*fClientMediaSession);
00126     for (MediaSubsession* mss = iter.next(); mss != NULL; mss = iter.next()) {
00127       ServerMediaSubsession* smss = new ProxyServerMediaSubsession(*mss);
00128       addSubsession(smss);
00129       if (fVerbosityLevel > 0) {
00130         envir() << *this << " added new \"ProxyServerMediaSubsession\" for "
00131                 << mss->protocolName() << "/" << mss->mediumName() << "/" << mss->codecName() << " track\n";
00132       }
00133     }
00134   } while (0);
00135 }
00136 
00137 void ProxyServerMediaSession::resetDESCRIBEState() {
00138   // Delete all of our "ProxyServerMediaSubsession"s; they'll get set up again once we get a response to the new "DESCRIBE".
00139   if (fOurRTSPServer != NULL) {
00140     // First, close any RTSP client connections that may have already been set up:
00141     fOurRTSPServer->closeAllClientSessionsForServerMediaSession(this);
00142   }
00143   deleteAllSubsessions();
00144 
00145   // Finally, delete the client "MediaSession" object that we had set up after receiving the response to the previous "DESCRIBE":
00146   Medium::close(fClientMediaSession); fClientMediaSession = NULL;
00147 }
00148 
00150 
00151 static void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) {
00152   char const* res;
00153 
00154   if (resultCode == 0) {
00155     // The "DESCRIBE" command succeeded, so "resultString" should be the stream's SDP description.
00156     res = resultString;
00157   } else {
00158     // The "DESCRIBE" command failed.
00159     res = NULL;
00160   }
00161   ((ProxyRTSPClient*)rtspClient)->continueAfterDESCRIBE(res);
00162   delete[] resultString;
00163 }
00164 
00165 static void continueAfterSETUP(RTSPClient* rtspClient, int resultCode, char* resultString) {
00166   if (resultCode == 0) {
00167     ((ProxyRTSPClient*)rtspClient)->continueAfterSETUP();
00168   }
00169   delete[] resultString;
00170 }
00171 
00172 static void continueAfterOPTIONS(RTSPClient* rtspClient, int resultCode, char* resultString) {
00173   Boolean serverSupportsGetParameter = False;
00174   if (resultCode == 0) {
00175     // Note whether the server told us that it supports the "GET_PARAMETER" command:
00176     serverSupportsGetParameter = RTSPOptionIsSupported("GET_PARAMETER", resultString);
00177   }
00178   ((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, serverSupportsGetParameter);
00179   delete[] resultString;
00180 }
00181 
00182 #ifdef SEND_GET_PARAMETER_IF_SUPPORTED
00183 static void continueAfterGET_PARAMETER(RTSPClient* rtspClient, int resultCode, char* resultString) {
00184   ((ProxyRTSPClient*)rtspClient)->continueAfterLivenessCommand(resultCode, True);
00185   delete[] resultString;
00186 }
00187 #endif
00188 
00189 
00191 
00192 UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyRTSPClient& proxyRTSPClient) { // used for debugging
00193   return env << "ProxyRTSPClient[\"" << proxyRTSPClient.url() << "\"]";
00194 }
00195 
00196 ProxyRTSPClient::ProxyRTSPClient(ProxyServerMediaSession& ourServerMediaSession, char const* rtspURL,
00197                                  char const* username, char const* password,
00198                                  portNumBits tunnelOverHTTPPortNum, int verbosityLevel, int socketNumToServer)
00199   : RTSPClient(ourServerMediaSession.envir(), rtspURL, verbosityLevel, "ProxyRTSPClient",
00200                tunnelOverHTTPPortNum == (portNumBits)(~0) ? 0 : tunnelOverHTTPPortNum, socketNumToServer),
00201     fOurServerMediaSession(ourServerMediaSession), fOurURL(strDup(rtspURL)), fStreamRTPOverTCP(tunnelOverHTTPPortNum != 0),
00202     fSetupQueueHead(NULL), fSetupQueueTail(NULL), fNumSetupsDone(0), fNextDESCRIBEDelay(1),
00203     fServerSupportsGetParameter(False), fLastCommandWasPLAY(False),
00204     fLivenessCommandTask(NULL), fDESCRIBECommandTask(NULL), fSubsessionTimerTask(NULL) { 
00205   if (username != NULL && password != NULL) {
00206     fOurAuthenticator = new Authenticator(username, password);
00207   } else {
00208     fOurAuthenticator = NULL;
00209   }
00210 }
00211 
00212 void ProxyRTSPClient::reset() {
00213   envir().taskScheduler().unscheduleDelayedTask(fLivenessCommandTask); fLivenessCommandTask = NULL;
00214   envir().taskScheduler().unscheduleDelayedTask(fDESCRIBECommandTask); fDESCRIBECommandTask = NULL;
00215   envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); fSubsessionTimerTask = NULL;
00216 
00217   fSetupQueueHead = fSetupQueueTail = NULL;
00218   fNumSetupsDone = 0;
00219   fNextDESCRIBEDelay = 1;
00220   fLastCommandWasPLAY = False;
00221 
00222   RTSPClient::reset();
00223 }
00224 
00225 ProxyRTSPClient::~ProxyRTSPClient() {
00226   reset();
00227 
00228   delete fOurAuthenticator;
00229   delete[] fOurURL;
00230 }
00231 
00232 void ProxyRTSPClient::continueAfterDESCRIBE(char const* sdpDescription) {
00233   if (sdpDescription != NULL) {
00234     fOurServerMediaSession.continueAfterDESCRIBE(sdpDescription);
00235 
00236     // Unlike most RTSP streams, there might be a long delay between this "DESCRIBE" command (to the downstream server) and the
00237     // subsequent "SETUP"/"PLAY" - which doesn't occur until the first time that a client requests the stream.
00238     // To prevent the proxied connection (between us and the downstream server) from timing out, we send periodic 'liveness'
00239     // ("OPTIONS" or "GET_PARAMETER") commands.  (The usual RTCP liveness mechanism wouldn't work here, because RTCP packets
00240     // don't get sent until after the "PLAY" command.)
00241     scheduleLivenessCommand();
00242   } else {
00243     // The "DESCRIBE" command failed, most likely because the server or the stream is not yet running.
00244     // Reschedule another "DESCRIBE" command to take place later:
00245     scheduleDESCRIBECommand();
00246   }
00247 }
00248 
00249 void ProxyRTSPClient::continueAfterLivenessCommand(int resultCode, Boolean serverSupportsGetParameter) {
00250   if (resultCode != 0) {
00251     // The periodic 'liveness' command failed, suggesting that the back-end stream is no longer alive.
00252     // We handle this by resetting our connection state with this server.  Any current clients will be closed, but
00253     // subsequent clients will cause new RTSP "SETUP"s and "PLAY"s to get done, restarting the stream.
00254     // Then continue by sending more "DESCRIBE" commands, to try to restore the stream.
00255 
00256     fServerSupportsGetParameter = False; // until we learn otherwise, in response to a future "OPTIONS" command
00257 
00258     if (resultCode < 0) {
00259       // The 'liveness' command failed without getting a response from the server (otherwise "resultCode" would have been > 0).
00260       // This suggests that the RTSP connection itself has failed.  Print this error code, in case it's useful for debugging:
00261       if (fVerbosityLevel > 0) {
00262         envir() << *this << ": lost connection to server ('errno': " << -resultCode << ").  Resetting...\n";
00263       }
00264     }
00265 
00266     reset();
00267     fOurServerMediaSession.resetDESCRIBEState();
00268 
00269     setBaseURL(fOurURL); // because we'll be sending an initial "DESCRIBE" all over again
00270     sendDESCRIBE(this);
00271     return;
00272   }
00273 
00274   fServerSupportsGetParameter = serverSupportsGetParameter;
00275 
00276   // Schedule the next 'liveness' command (i.e., to tell the back-end server that we're still alive):
00277   scheduleLivenessCommand();
00278 }
00279 
00280 #define SUBSESSION_TIMEOUT_SECONDS 10 // how many seconds to wait for the last track's "SETUP" to be done (note below)
00281 
00282 void ProxyRTSPClient::continueAfterSETUP() {
00283   if (fVerbosityLevel > 0) {
00284     envir() << *this << "::continueAfterSETUP(): head codec: " << fSetupQueueHead->fClientMediaSubsession.codecName()
00285             << "; numSubsessions " << fSetupQueueHead->fParentSession->numSubsessions() << "\n\tqueue:";
00286     for (ProxyServerMediaSubsession* p = fSetupQueueHead; p != NULL; p = p->fNext) {
00287       envir() << "\t" << p->fClientMediaSubsession.codecName();
00288     }
00289     envir() << "\n";
00290   }
00291   envir().taskScheduler().unscheduleDelayedTask(fSubsessionTimerTask); // in case it had been set
00292 
00293   // Dequeue the first "ProxyServerMediaSubsession" from our 'SETUP queue'.  It will be the one for which this "SETUP" was done:
00294   ProxyServerMediaSubsession* smss = fSetupQueueHead; // Assert: != NULL
00295   fSetupQueueHead = fSetupQueueHead->fNext;
00296   if (fSetupQueueHead == NULL) fSetupQueueTail = NULL;
00297 
00298   if (fSetupQueueHead != NULL) {
00299     // There are still entries in the queue, for tracks for which we have still to do a "SETUP".
00300     // "SETUP" the first of these now:
00301     sendSetupCommand(fSetupQueueHead->fClientMediaSubsession, ::continueAfterSETUP,
00302                      False, fStreamRTPOverTCP, False, fOurAuthenticator);
00303     ++fNumSetupsDone;
00304     fSetupQueueHead->fHaveSetupStream = True;
00305   } else {
00306     if (fNumSetupsDone >= smss->fParentSession->numSubsessions()) {
00307       // We've now finished setting up each of our subsessions (i.e., 'tracks').
00308       // Continue by sending a "PLAY" command (an 'aggregate' "PLAY" command, on the whole session):
00309       sendPlayCommand(smss->fClientMediaSubsession.parentSession(), NULL, -1.0f, -1.0f, 1.0f, fOurAuthenticator);
00310           // the "-1.0f" "start" parameter causes the "PLAY" to be sent without a "Range:" header, in case we'd already done
00311           // a "PLAY" before (as a result of a 'subsession timeout' (note below))
00312       fLastCommandWasPLAY = True;
00313     } else {
00314       // Some of this session's subsessions (i.e., 'tracks') remain to be "SETUP".  They might get "SETUP" very soon, but it's
00315       // also possible - if the remote client chose to play only some of the session's tracks - that they might not.
00316       // To allow for this possibility, we set a timer.  If the timer expires without the remaining subsessions getting "SETUP",
00317       // then we send a "PLAY" command anyway:
00318       fSubsessionTimerTask
00319         = envir().taskScheduler().scheduleDelayedTask(SUBSESSION_TIMEOUT_SECONDS*MILLION, (TaskFunc*)subsessionTimeout, this);
00320     }
00321   }
00322 }
00323 
00324 void ProxyRTSPClient::scheduleLivenessCommand() {
00325   // Delay a random time before sending another 'liveness' command.
00326   unsigned delayMax = sessionTimeoutParameter(); // if the server specified a maximum time between 'liveness' probes, then use that
00327   if (delayMax == 0) {
00328     delayMax = 60;
00329   }
00330 
00331   // Choose a random time from [delayMax/2,delayMax) seconds:
00332   unsigned const dM5 = delayMax*500000;
00333   unsigned uSecondsToDelay = dM5 + (dM5*our_random())%dM5;
00334   fLivenessCommandTask = envir().taskScheduler().scheduleDelayedTask(uSecondsToDelay, sendLivenessCommand, this);
00335 }
00336 
00337 void ProxyRTSPClient::sendLivenessCommand(void* clientData) {
00338   ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData;
00339 
00340   // Note.  By default, we do not send "GET_PARAMETER" as our 'liveness notification' command, even if the server previously
00341   // indicated (in its response to our earlier "OPTIONS" command) that it supported "GET_PARAMETER".  This is because
00342   // "GET_PARAMETER" crashes some camera servers (even though they claimed to support "GET_PARAMETER").
00343 #ifdef SEND_GET_PARAMETER_IF_SUPPORTED
00344   MediaSession* sess = rtspClient->fOurServerMediaSession.fClientMediaSession;
00345 
00346   if (rtspClient->fServerSupportsGetParameter && rtspClient->fNumSetupsDone > 0 && sess != NULL) {
00347     rtspClient->sendGetParameterCommand(*sess, ::continueAfterGET_PARAMETER, "", rtspClient->auth());
00348   } else {
00349 #endif
00350     rtspClient->sendOptionsCommand(::continueAfterOPTIONS, rtspClient->auth());
00351 #ifdef SEND_GET_PARAMETER_IF_SUPPORTED
00352   }
00353 #endif
00354 }
00355 
00356 void ProxyRTSPClient::scheduleDESCRIBECommand() {
00357   // Delay 1s, 2s, 4s, 8s ... 256s until sending the next "DESCRIBE".  Then, keep delaying a random time from [256..511] seconds:
00358   unsigned secondsToDelay;
00359   if (fNextDESCRIBEDelay <= 256) {
00360     secondsToDelay = fNextDESCRIBEDelay;
00361     fNextDESCRIBEDelay *= 2;
00362   } else {
00363     secondsToDelay = 256 + (our_random()&0xFF); // [256..511] seconds
00364   }
00365 
00366   if (fVerbosityLevel > 0) {
00367     envir() << *this << ": RTSP \"DESCRIBE\" command failed; trying again in " << secondsToDelay << " seconds\n";
00368   }
00369   fDESCRIBECommandTask = envir().taskScheduler().scheduleDelayedTask(secondsToDelay*MILLION, sendDESCRIBE, this);
00370 }
00371 
00372 void ProxyRTSPClient::sendDESCRIBE(void* clientData) {
00373   ProxyRTSPClient* rtspClient = (ProxyRTSPClient*)clientData;
00374   if (rtspClient != NULL) rtspClient->sendDescribeCommand(::continueAfterDESCRIBE, rtspClient->auth());
00375 }
00376 
00377 void ProxyRTSPClient::subsessionTimeout(void* clientData) {
00378   ((ProxyRTSPClient*)clientData)->handleSubsessionTimeout();
00379 }
00380 
00381 void ProxyRTSPClient::handleSubsessionTimeout() {
00382   // We still have one or more subsessions ('tracks') left to "SETUP".  But we can't wait any longer for them.  Send a "PLAY" now:
00383   MediaSession* sess = fOurServerMediaSession.fClientMediaSession;
00384   if (sess != NULL) sendPlayCommand(*sess, NULL, -1.0f, -1.0f, 1.0f, fOurAuthenticator);
00385   fLastCommandWasPLAY = True;
00386 }
00387 
00388 
00390 
00391 ProxyServerMediaSubsession::ProxyServerMediaSubsession(MediaSubsession& mediaSubsession)
00392   : OnDemandServerMediaSubsession(mediaSubsession.parentSession().envir(), True/*reuseFirstSource*/),
00393     fClientMediaSubsession(mediaSubsession), fNext(NULL), fHaveSetupStream(False) {
00394 }
00395 
00396 UsageEnvironment& operator<<(UsageEnvironment& env, const ProxyServerMediaSubsession& psmss) { // used for debugging
00397   return env << "ProxyServerMediaSubsession[\"" << psmss.codecName() << "\"]";
00398 }
00399 
00400 ProxyServerMediaSubsession::~ProxyServerMediaSubsession() {
00401   if (verbosityLevel() > 0) {
00402     envir() << *this << "::~ProxyServerMediaSubsession()\n";
00403   }
00404 }
00405 
00406 FramedSource* ProxyServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate) {
00407   ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession;
00408 
00409   if (verbosityLevel() > 0) {
00410     envir() << *this << "::createNewStreamSource(session id " << clientSessionId << ")\n";
00411   }
00412 
00413   // If we haven't yet created a data source from our 'media subsession' object, initiate() it to do so:
00414   if (fClientMediaSubsession.readSource() == NULL) {
00415     fClientMediaSubsession.receiveRawMP3ADUs(); // hack for MPA-ROBUST streams
00416     fClientMediaSubsession.receiveRawJPEGFrames(); // hack for proxying JPEG/RTP streams. (Don't do this if we're transcoding.)
00417     fClientMediaSubsession.initiate();
00418     if (verbosityLevel() > 0) {
00419       envir() << "\tInitiated: " << *this << "\n";
00420     }
00421 
00422     if (fClientMediaSubsession.readSource() != NULL) {
00423       // Add to the front of all data sources a filter that will 'normalize' their frames' presentation times,
00424       // before the frames get re-transmitted by our server:
00425       char const* const codecName = fClientMediaSubsession.codecName();
00426       FramedFilter* normalizerFilter = sms->fPresentationTimeSessionNormalizer
00427         ->createNewPresentationTimeSubsessionNormalizer(fClientMediaSubsession.readSource(), fClientMediaSubsession.rtpSource(),
00428                                                         codecName);
00429       fClientMediaSubsession.addFilter(normalizerFilter);
00430 
00431       // Some data sources require a 'framer' object to be added, before they can be fed into a "RTPSink".  Adjust for this now:
00432       if (strcmp(codecName, "H264") == 0) {
00433         fClientMediaSubsession.addFilter(H264VideoStreamDiscreteFramer::createNew(envir(), fClientMediaSubsession.readSource()));
00434       } else if (strcmp(codecName, "MP4V-ES") == 0) {
00435         fClientMediaSubsession.addFilter(MPEG4VideoStreamDiscreteFramer
00436                                          ::createNew(envir(), fClientMediaSubsession.readSource(), True/* leave PTs unmodified*/));
00437       } else if (strcmp(codecName, "MPV") == 0) {
00438         fClientMediaSubsession.addFilter(MPEG1or2VideoStreamDiscreteFramer::createNew(envir(), fClientMediaSubsession.readSource(),
00439                                                                                       False, 5.0, True/* leave PTs unmodified*/));
00440       } else if (strcmp(codecName, "DV") == 0) {
00441         fClientMediaSubsession.addFilter(DVVideoStreamFramer::createNew(envir(), fClientMediaSubsession.readSource(),
00442                                                                         False, True/* leave PTs unmodified*/));
00443       }
00444     }
00445 
00446     if (fClientMediaSubsession.rtcpInstance() != NULL) {
00447       fClientMediaSubsession.rtcpInstance()->setByeHandler(subsessionByeHandler, this);
00448     }
00449   }
00450 
00451   ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient;
00452   if (clientSessionId != 0) {
00453     // We're being called as a result of implementing a RTSP "SETUP".
00454     if (!fHaveSetupStream) {
00455       // This is our first "SETUP".  Send RTSP "SETUP" and later "PLAY" commands to the proxied server, to start streaming:
00456       // (Before sending "SETUP", enqueue ourselves on the "RTSPClient"s 'SETUP queue', so we'll be able to get the correct
00457       //  "ProxyServerMediaSubsession" to handle the response.  (Note that responses come back in the same order as requests.))
00458       Boolean queueWasEmpty = proxyRTSPClient->fSetupQueueHead == NULL;
00459       if (queueWasEmpty) {
00460         proxyRTSPClient->fSetupQueueHead = this;
00461       } else {
00462         proxyRTSPClient->fSetupQueueTail->fNext = this;
00463       }
00464       proxyRTSPClient->fSetupQueueTail = this;
00465 
00466       // Hack: If there's already a pending "SETUP" request (for another track), don't send this track's "SETUP" right away, because
00467       // the server might not properly handle 'pipelined' requests.  Instead, wait until after previous "SETUP" responses come back.
00468       if (queueWasEmpty) {
00469         proxyRTSPClient->sendSetupCommand(fClientMediaSubsession, ::continueAfterSETUP,
00470                                           False, proxyRTSPClient->fStreamRTPOverTCP, False, proxyRTSPClient->auth());
00471         ++proxyRTSPClient->fNumSetupsDone;
00472         fHaveSetupStream = True;
00473       }
00474     } else {
00475       // This is a "SETUP" from a new client.  We know that there are no other currently active clients (otherwise we wouldn't
00476       // have been called here), so we know that the substream was previously "PAUSE"d.  Send "PLAY" downstream once again,
00477       // to resume the stream:
00478       if (!proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PLAY"; not one for each subsession
00479         proxyRTSPClient->sendPlayCommand(fClientMediaSubsession.parentSession(), NULL, -1.0f/*resume from previous point*/,
00480                                          -1.0f, 1.0f, proxyRTSPClient->auth());
00481         proxyRTSPClient->fLastCommandWasPLAY = True;
00482       }
00483     }
00484   }
00485 
00486   estBitrate = fClientMediaSubsession.bandwidth();
00487   if (estBitrate == 0) estBitrate = 50; // kbps, estimate
00488   return fClientMediaSubsession.readSource();
00489 }
00490 
00491 void ProxyServerMediaSubsession::closeStreamSource(FramedSource* inputSource) {
00492   if (verbosityLevel() > 0) {
00493     envir() << *this << "::closeStreamSource()\n";
00494   }
00495   // Because there's only one input source for this 'subsession' (regardless of how many downstream clients are proxying it),
00496   // we don't close the input source here.  (Instead, we wait until *this* object gets deleted.)
00497   // However, because (as evidenced by this function having been called) we no longer have any clients accessing the stream,
00498   // then we "PAUSE" the downstream proxied stream, until a new client arrives:
00499   if (fHaveSetupStream) {
00500     ProxyServerMediaSession* const sms = (ProxyServerMediaSession*)fParentSession;
00501     ProxyRTSPClient* const proxyRTSPClient = sms->fProxyRTSPClient;
00502     if (proxyRTSPClient->fLastCommandWasPLAY) { // so that we send only one "PAUSE"; not one for each subsession
00503       proxyRTSPClient->sendPauseCommand(fClientMediaSubsession.parentSession(), NULL, proxyRTSPClient->auth());
00504       proxyRTSPClient->fLastCommandWasPLAY = False;
00505     }
00506   }
00507 }
00508 
00509 RTPSink* ProxyServerMediaSubsession
00510 ::createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource) {
00511   if (verbosityLevel() > 0) {
00512     envir() << *this << "::createNewRTPSink()\n";
00513   }
00514 
00515   // Create (and return) the appropriate "RTPSink" object for our codec:
00516   RTPSink* newSink;
00517   char const* const codecName = fClientMediaSubsession.codecName();
00518   if (strcmp(codecName, "AC3") == 0 || strcmp(codecName, "EAC3") == 0) {
00519     newSink = AC3AudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00520                                          fClientMediaSubsession.rtpTimestampFrequency()); 
00521 #if 0 // This code does not work; do *not* enable it:
00522   } else if (strcmp(codecName, "AMR") == 0 || strcmp(codecName, "AMR-WB") == 0) {
00523     Boolean isWideband = strcmp(codecName, "AMR-WB") == 0;
00524     newSink = AMRAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00525                                          isWideband, fClientMediaSubsession.numChannels());
00526 #endif
00527   } else if (strcmp(codecName, "DV") == 0) {
00528     newSink = DVVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
00529   } else if (strcmp(codecName, "GSM") == 0) {
00530     newSink = GSMAudioRTPSink::createNew(envir(), rtpGroupsock);
00531   } else if (strcmp(codecName, "H263-1998") == 0 || strcmp(codecName, "H263-2000") == 0) {
00532     newSink = H263plusVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00533                                               fClientMediaSubsession.rtpTimestampFrequency()); 
00534   } else if (strcmp(codecName, "H264") == 0) {
00535     newSink = H264VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00536                                           fClientMediaSubsession.fmtp_spropparametersets());
00537   } else if (strcmp(codecName, "JPEG") == 0) {
00538     newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock, 26, 90000, "video", "JPEG",
00539                                        1/*numChannels*/, False/*allowMultipleFramesPerPacket*/, False/*doNormalMBitRule*/);
00540   } else if (strcmp(codecName, "MP4A-LATM") == 0) {
00541     newSink = MPEG4LATMAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00542                                                fClientMediaSubsession.rtpTimestampFrequency(),
00543                                                fClientMediaSubsession.fmtp_config(),
00544                                                fClientMediaSubsession.numChannels());
00545   } else if (strcmp(codecName, "MP4V-ES") == 0) {
00546     newSink = MPEG4ESVideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00547                                              fClientMediaSubsession.rtpTimestampFrequency(),
00548                                              fClientMediaSubsession.fmtp_profile_level_id(), fClientMediaSubsession.fmtp_config()); 
00549   } else if (strcmp(codecName, "MPA") == 0) {
00550     newSink = MPEG1or2AudioRTPSink::createNew(envir(), rtpGroupsock);
00551   } else if (strcmp(codecName, "MPA-ROBUST") == 0) {
00552     newSink = MP3ADURTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
00553   } else if (strcmp(codecName, "MPEG4-GENERIC") == 0) {
00554     newSink = MPEG4GenericRTPSink::createNew(envir(), rtpGroupsock,
00555                                              rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(),
00556                                              fClientMediaSubsession.mediumName(), fClientMediaSubsession.fmtp_mode(),
00557                                              fClientMediaSubsession.fmtp_config(), fClientMediaSubsession.numChannels());
00558   } else if (strcmp(codecName, "MPV") == 0) {
00559     newSink = MPEG1or2VideoRTPSink::createNew(envir(), rtpGroupsock);
00560   } else if (strcmp(codecName, "T140") == 0) {
00561     newSink = T140TextRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
00562   } else if (strcmp(codecName, "VORBIS") == 0) {
00563     newSink = VorbisAudioRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic,
00564                                             fClientMediaSubsession.rtpTimestampFrequency(), fClientMediaSubsession.numChannels(),
00565                                             fClientMediaSubsession.fmtp_config()); 
00566   } else if (strcmp(codecName, "VP8") == 0) {
00567     newSink = VP8VideoRTPSink::createNew(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic);
00568   } else if (strcmp(codecName, "AMR") == 0 || strcmp(codecName, "AMR-WB") == 0) {
00569     // Proxying of these codecs is currently *not* supported, because the data received by the "RTPSource" object is not in a
00570     // form that can be fed directly into a corresponding "RTPSink" object.
00571     if (verbosityLevel() > 0) {
00572       envir() << "\treturns NULL (because we currently don't support the proxying of \""
00573               << fClientMediaSubsession.mediumName() << "/" << codecName << "\" streams)\n";
00574     }
00575     return NULL;
00576   } else if (strcmp(codecName, "QCELP") == 0 ||
00577              strcmp(codecName, "H261") == 0 ||
00578              strcmp(codecName, "H263-1998") == 0 || strcmp(codecName, "H263-2000") == 0 ||
00579              strcmp(codecName, "X-QT") == 0 || strcmp(codecName, "X-QUICKTIME") == 0) {
00580     // This codec requires a specialized RTP payload format; however, we don't yet have an appropriate "RTPSink" subclass for it:
00581     if (verbosityLevel() > 0) {
00582       envir() << "\treturns NULL (because we don't have a \"RTPSink\" subclass for this RTP payload format)\n";
00583     }
00584     return NULL;
00585   } else {
00586     // This codec is assumed to have a simple RTP payload format that can be implemented just with a "SimpleRTPSink":
00587     Boolean allowMultipleFramesPerPacket = True; // by default
00588     Boolean doNormalMBitRule = True; // by default
00589     // Some codecs change the above default parameters:
00590     if (strcmp(codecName, "MP2T") == 0) {
00591       doNormalMBitRule = False; // no RTP 'M' bit
00592     }
00593     newSink = SimpleRTPSink::createNew(envir(), rtpGroupsock,
00594                                        rtpPayloadTypeIfDynamic, fClientMediaSubsession.rtpTimestampFrequency(),
00595                                        fClientMediaSubsession.mediumName(), fClientMediaSubsession.codecName(),
00596                                        fClientMediaSubsession.numChannels(), allowMultipleFramesPerPacket, doNormalMBitRule);
00597   }
00598 
00599   // Because our relayed frames' presentation times are inaccurate until the input frames have been RTCP-synchronized,
00600   // we temporarily disable RTCP "SR" reports for this "RTPSink" object:
00601   newSink->enableRTCPReports() = False;
00602 
00603   // Also tell our "PresentationTimeSubsessionNormalizer" object about the "RTPSink", so it can enable RTCP "SR" reports later:
00604   PresentationTimeSubsessionNormalizer* ssNormalizer;
00605   if (strcmp(codecName, "H264") == 0 ||
00606       strcmp(codecName, "MP4V-ES") == 0 ||
00607       strcmp(codecName, "MPV") == 0 ||
00608       strcmp(codecName, "DV") == 0) {
00609     // There was a separate 'framer' object in front of the "PresentationTimeSubsessionNormalizer", so go back one object to get it:
00610     ssNormalizer = (PresentationTimeSubsessionNormalizer*)(((FramedFilter*)inputSource)->inputSource());
00611   } else {
00612     ssNormalizer = (PresentationTimeSubsessionNormalizer*)inputSource;
00613   }
00614   ssNormalizer->setRTPSink(newSink);
00615 
00616   return newSink;
00617 }
00618 
00619 void ProxyServerMediaSubsession::subsessionByeHandler(void* clientData) {
00620   ((ProxyServerMediaSubsession*)clientData)->subsessionByeHandler();
00621 }
00622 
00623 void ProxyServerMediaSubsession::subsessionByeHandler() {
00624   if (verbosityLevel() > 0) {
00625     envir() << *this << ": received RTCP \"BYE\"\n";
00626   }
00627 
00628   // This "BYE" signals that our input source has (effectively) closed, so handle this accordingly:
00629   FramedSource::handleClosure(fClientMediaSubsession.readSource());
00630 
00631   // Then, close our input source for real:
00632   fClientMediaSubsession.deInitiate();
00633 }
00634 
00635 
00637 
00638 // PresentationTimeSessionNormalizer:
00639 
00640 PresentationTimeSessionNormalizer::PresentationTimeSessionNormalizer(UsageEnvironment& env)
00641   : Medium(env),
00642     fSubsessionNormalizers(NULL), fMasterSSNormalizer(NULL) {
00643 }
00644 
00645 PresentationTimeSessionNormalizer::~PresentationTimeSessionNormalizer() {
00646   while (fSubsessionNormalizers != NULL) {
00647     delete fSubsessionNormalizers;
00648   }
00649 }
00650 
00651 PresentationTimeSubsessionNormalizer*
00652 PresentationTimeSessionNormalizer::createNewPresentationTimeSubsessionNormalizer(FramedSource* inputSource, RTPSource* rtpSource,
00653                                                                                  char const* codecName) {
00654   fSubsessionNormalizers
00655     = new PresentationTimeSubsessionNormalizer(*this, inputSource, rtpSource, codecName, fSubsessionNormalizers);
00656   return fSubsessionNormalizers;
00657 }
00658 
00659 void PresentationTimeSessionNormalizer::normalizePresentationTime(PresentationTimeSubsessionNormalizer* ssNormalizer,
00660                                                                   struct timeval& toPT, struct timeval const& fromPT) {
00661   Boolean const hasBeenSynced = ssNormalizer->fRTPSource->hasBeenSynchronizedUsingRTCP();
00662 
00663   if (!hasBeenSynced) {
00664     // If "fromPT" has not yet been RTCP-synchronized, then it was generated by our own receiving code, and thus
00665     // is already aligned with 'wall-clock' time.  Just copy it 'as is' to "toPT":
00666     toPT = fromPT;
00667   } else {
00668     if (fMasterSSNormalizer == NULL) {
00669       // Make "ssNormalizer" the 'master' subsession - meaning that its presentation time is adjusted to align with 'wall clock'
00670       // time, and the presentation times of other subsessions (if any) are adjusted to retain their relative separation with
00671       // those of the master:
00672       fMasterSSNormalizer = ssNormalizer;
00673 
00674       struct timeval timeNow;
00675       gettimeofday(&timeNow, NULL);
00676 
00677       // Compute: fPTAdjustment = timeNow - fromPT
00678       fPTAdjustment.tv_sec = timeNow.tv_sec - fromPT.tv_sec;
00679       fPTAdjustment.tv_usec = timeNow.tv_usec - fromPT.tv_usec;
00680       // Note: It's OK if one or both of these fields underflows; the result still works out OK later.
00681     }
00682 
00683     // Compute a normalized presentation time: toPT = fromPT + fPTAdjustment
00684     toPT.tv_sec = fromPT.tv_sec + fPTAdjustment.tv_sec - 1;
00685     toPT.tv_usec = fromPT.tv_usec + fPTAdjustment.tv_usec + MILLION;
00686     while (toPT.tv_usec > MILLION) { ++toPT.tv_sec; toPT.tv_usec -= MILLION; }
00687 
00688     // Because "ssNormalizer"s relayed presentation times are accurate from now on, enable RTCP "SR" reports for its "RTPSink":
00689     RTPSink* const rtpSink = ssNormalizer->fRTPSink;
00690     if (rtpSink != NULL) { // sanity check; should always be true
00691       rtpSink->enableRTCPReports() = True;
00692     }
00693   }
00694 }
00695 
00696 void PresentationTimeSessionNormalizer
00697 ::removePresentationTimeSubsessionNormalizer(PresentationTimeSubsessionNormalizer* ssNormalizer) {
00698   // Unlink "ssNormalizer" from the linked list (starting with "fSubsessionNormalizers"):
00699   if (fSubsessionNormalizers == ssNormalizer) {
00700     fSubsessionNormalizers = fSubsessionNormalizers->fNext;
00701   } else {
00702     PresentationTimeSubsessionNormalizer** ssPtrPtr = &(fSubsessionNormalizers->fNext);
00703     while (*ssPtrPtr != ssNormalizer) ssPtrPtr = &((*ssPtrPtr)->fNext);
00704     *ssPtrPtr = (*ssPtrPtr)->fNext;
00705   }
00706 }
00707 
00708 // PresentationTimeSubsessionNormalizer:
00709 
00710 PresentationTimeSubsessionNormalizer
00711 ::PresentationTimeSubsessionNormalizer(PresentationTimeSessionNormalizer& parent, FramedSource* inputSource, RTPSource* rtpSource,
00712                                        char const* codecName, PresentationTimeSubsessionNormalizer* next)
00713   : FramedFilter(parent.envir(), inputSource),
00714     fParent(parent), fRTPSource(rtpSource), fRTPSink(NULL), fCodecName(codecName), fNext(next) {
00715 }
00716 
00717 PresentationTimeSubsessionNormalizer::~PresentationTimeSubsessionNormalizer() {
00718   fParent.removePresentationTimeSubsessionNormalizer(this);
00719 }
00720 
00721 void PresentationTimeSubsessionNormalizer::afterGettingFrame(void* clientData, unsigned frameSize,
00722                                                              unsigned numTruncatedBytes,
00723                                                              struct timeval presentationTime,
00724                                                              unsigned durationInMicroseconds) {
00725   ((PresentationTimeSubsessionNormalizer*)clientData)
00726     ->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds);
00727 }
00728 
00729 void PresentationTimeSubsessionNormalizer::afterGettingFrame(unsigned frameSize,
00730                                                              unsigned numTruncatedBytes,
00731                                                              struct timeval presentationTime,
00732                                                              unsigned durationInMicroseconds) {
00733   // This filter is implemented by passing all frames through unchanged, except that "fPresentationTime" is changed:
00734   fFrameSize = frameSize;
00735   fNumTruncatedBytes = numTruncatedBytes;
00736   fDurationInMicroseconds = durationInMicroseconds;
00737 
00738   fParent.normalizePresentationTime(this, fPresentationTime, presentationTime);
00739 
00740   // Hack for JPEG/RTP proxying.  Because we're proxying JPEG by just copying the raw JPEG/RTP payloads, without interpreting them,
00741   // we need to also 'copy' the RTP 'M' (marker) bit from the "RTPSource" to the "RTPSink":
00742   if (fRTPSource->curPacketMarkerBit() && strcmp(fCodecName, "JPEG") == 0) ((SimpleRTPSink*)fRTPSink)->setMBitOnNextPacket();
00743 
00744   // Complete delivery:
00745   FramedSource::afterGetting(this);
00746 }
00747 
00748 void PresentationTimeSubsessionNormalizer::doGetNextFrame() {
00749   fInputSource->getNextFrame(fTo, fMaxSize, afterGettingFrame, this, FramedSource::handleClosure, this);
00750 }

Generated on Tue Jun 18 13:16:52 2013 for live by  doxygen 1.5.2