00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "OnDemandServerMediaSubsession.hh"
00023 #include <GroupsockHelper.hh>
00024
00025 OnDemandServerMediaSubsession
00026 ::OnDemandServerMediaSubsession(UsageEnvironment& env,
00027 Boolean reuseFirstSource,
00028 portNumBits initialPortNum)
00029 : ServerMediaSubsession(env),
00030 fSDPLines(NULL), fReuseFirstSource(reuseFirstSource), fInitialPortNum(initialPortNum), fLastStreamToken(NULL) {
00031 fDestinationsHashTable = HashTable::create(ONE_WORD_HASH_KEYS);
00032 gethostname(fCNAME, sizeof fCNAME);
00033 fCNAME[sizeof fCNAME-1] = '\0';
00034 }
00035
00036 OnDemandServerMediaSubsession::~OnDemandServerMediaSubsession() {
00037 delete[] fSDPLines;
00038
00039
00040 while (1) {
00041 Destinations* destinations
00042 = (Destinations*)(fDestinationsHashTable->RemoveNext());
00043 if (destinations == NULL) break;
00044 delete destinations;
00045 }
00046 delete fDestinationsHashTable;
00047 }
00048
00049 char const*
00050 OnDemandServerMediaSubsession::sdpLines() {
00051 if (fSDPLines == NULL) {
00052
00053
00054
00055
00056 unsigned estBitrate;
00057 FramedSource* inputSource = createNewStreamSource(0, estBitrate);
00058 if (inputSource == NULL) return NULL;
00059
00060 struct in_addr dummyAddr;
00061 dummyAddr.s_addr = 0;
00062 Groupsock dummyGroupsock(envir(), dummyAddr, 0, 0);
00063 unsigned char rtpPayloadType = 96 + trackNumber()-1;
00064 RTPSink* dummyRTPSink
00065 = createNewRTPSink(&dummyGroupsock, rtpPayloadType, inputSource);
00066
00067 setSDPLinesFromRTPSink(dummyRTPSink, inputSource, estBitrate);
00068 Medium::close(dummyRTPSink);
00069 closeStreamSource(inputSource);
00070 }
00071
00072 return fSDPLines;
00073 }
00074
00075 void OnDemandServerMediaSubsession
00076 ::getStreamParameters(unsigned clientSessionId,
00077 netAddressBits clientAddress,
00078 Port const& clientRTPPort,
00079 Port const& clientRTCPPort,
00080 int tcpSocketNum,
00081 unsigned char rtpChannelId,
00082 unsigned char rtcpChannelId,
00083 netAddressBits& destinationAddress,
00084 u_int8_t& ,
00085 Boolean& isMulticast,
00086 Port& serverRTPPort,
00087 Port& serverRTCPPort,
00088 void*& streamToken) {
00089 if (destinationAddress == 0) destinationAddress = clientAddress;
00090 struct in_addr destinationAddr; destinationAddr.s_addr = destinationAddress;
00091 isMulticast = False;
00092
00093 if (fLastStreamToken != NULL && fReuseFirstSource) {
00094
00095
00096 serverRTPPort = ((StreamState*)fLastStreamToken)->serverRTPPort();
00097 serverRTCPPort = ((StreamState*)fLastStreamToken)->serverRTCPPort();
00098 ++((StreamState*)fLastStreamToken)->referenceCount();
00099 streamToken = fLastStreamToken;
00100 } else {
00101
00102 unsigned streamBitrate;
00103 FramedSource* mediaSource
00104 = createNewStreamSource(clientSessionId, streamBitrate);
00105
00106
00107
00108 RTPSink* rtpSink;
00109 BasicUDPSink* udpSink;
00110 Groupsock* rtpGroupsock;
00111 Groupsock* rtcpGroupsock;
00112 portNumBits serverPortNum;
00113 if (clientRTCPPort.num() == 0) {
00114
00115 NoReuse dummy(envir());
00116 for (serverPortNum = fInitialPortNum; ; ++serverPortNum) {
00117 struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00118
00119 serverRTPPort = serverPortNum;
00120 rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
00121 if (rtpGroupsock->socketNum() >= 0) break;
00122 }
00123
00124 rtcpGroupsock = NULL;
00125 rtpSink = NULL;
00126 udpSink = BasicUDPSink::createNew(envir(), rtpGroupsock);
00127 } else {
00128
00129
00130 NoReuse dummy(envir());
00131 for (portNumBits serverPortNum = fInitialPortNum; ; serverPortNum += 2) {
00132 struct in_addr dummyAddr; dummyAddr.s_addr = 0;
00133
00134 serverRTPPort = serverPortNum;
00135 rtpGroupsock = new Groupsock(envir(), dummyAddr, serverRTPPort, 255);
00136 if (rtpGroupsock->socketNum() < 0) {
00137 delete rtpGroupsock;
00138 continue;
00139 }
00140
00141 serverRTCPPort = serverPortNum+1;
00142 rtcpGroupsock = new Groupsock(envir(), dummyAddr, serverRTCPPort, 255);
00143 if (rtcpGroupsock->socketNum() < 0) {
00144 delete rtpGroupsock;
00145 delete rtcpGroupsock;
00146 continue;
00147 }
00148
00149 break;
00150 }
00151
00152 unsigned char rtpPayloadType = 96 + trackNumber()-1;
00153 rtpSink = createNewRTPSink(rtpGroupsock, rtpPayloadType, mediaSource);
00154 udpSink = NULL;
00155 }
00156
00157
00158
00159 if (rtpGroupsock != NULL) rtpGroupsock->removeAllDestinations();
00160 if (rtcpGroupsock != NULL) rtcpGroupsock->removeAllDestinations();
00161
00162 if (rtpGroupsock != NULL) {
00163
00164
00165 unsigned rtpBufSize = streamBitrate * 25 / 2;
00166 if (rtpBufSize < 50 * 1024) rtpBufSize = 50 * 1024;
00167 increaseSendBufferTo(envir(), rtpGroupsock->socketNum(), rtpBufSize);
00168 }
00169
00170
00171 streamToken = fLastStreamToken
00172 = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
00173 streamBitrate, mediaSource,
00174 rtpGroupsock, rtcpGroupsock);
00175 }
00176
00177
00178 Destinations* destinations;
00179 if (tcpSocketNum < 0) {
00180 destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort);
00181 } else {
00182 destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
00183 }
00184 fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
00185 }
00186
00187 void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
00188 void* streamToken,
00189 TaskFunc* rtcpRRHandler,
00190 void* rtcpRRHandlerClientData,
00191 unsigned short& rtpSeqNum,
00192 unsigned& rtpTimestamp,
00193 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
00194 void* serverRequestAlternativeByteHandlerClientData) {
00195 StreamState* streamState = (StreamState*)streamToken;
00196 Destinations* destinations
00197 = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00198 if (streamState != NULL) {
00199 streamState->startPlaying(destinations,
00200 rtcpRRHandler, rtcpRRHandlerClientData,
00201 serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
00202 RTPSink* rtpSink = streamState->rtpSink();
00203 if (rtpSink != NULL) {
00204 rtpSeqNum = rtpSink->currentSeqNo();
00205 rtpTimestamp = rtpSink->presetNextTimestamp();
00206 }
00207 }
00208 }
00209
00210 void OnDemandServerMediaSubsession::pauseStream(unsigned ,
00211 void* streamToken) {
00212
00213
00214 if (fReuseFirstSource) return;
00215
00216 StreamState* streamState = (StreamState*)streamToken;
00217 if (streamState != NULL) streamState->pause();
00218 }
00219
00220 void OnDemandServerMediaSubsession::seekStream(unsigned ,
00221 void* streamToken, double& seekNPT, double streamDuration, u_int64_t& numBytes) {
00222 numBytes = 0;
00223
00224
00225 if (fReuseFirstSource) return;
00226
00227 StreamState* streamState = (StreamState*)streamToken;
00228 if (streamState != NULL && streamState->mediaSource() != NULL) {
00229 seekStreamSource(streamState->mediaSource(), seekNPT, streamDuration, numBytes);
00230
00231 streamState->startNPT() = (float)seekNPT;
00232 RTPSink* rtpSink = streamState->rtpSink();
00233 if (rtpSink != NULL) rtpSink->resetPresentationTimes();
00234 }
00235 }
00236
00237 void OnDemandServerMediaSubsession::seekStream(unsigned ,
00238 void* streamToken, char*& absStart, char*& absEnd) {
00239
00240 if (fReuseFirstSource) return;
00241
00242 StreamState* streamState = (StreamState*)streamToken;
00243 if (streamState != NULL && streamState->mediaSource() != NULL) {
00244 seekStreamSource(streamState->mediaSource(), absStart, absEnd);
00245 }
00246 }
00247
00248 void OnDemandServerMediaSubsession::nullSeekStream(unsigned , void* streamToken) {
00249 StreamState* streamState = (StreamState*)streamToken;
00250 if (streamState != NULL && streamState->mediaSource() != NULL) {
00251
00252 streamState->startNPT() = getCurrentNPT(streamToken);
00253 RTPSink* rtpSink = streamState->rtpSink();
00254 if (rtpSink != NULL) rtpSink->resetPresentationTimes();
00255 }
00256 }
00257
00258 void OnDemandServerMediaSubsession::setStreamScale(unsigned ,
00259 void* streamToken, float scale) {
00260
00261
00262 if (fReuseFirstSource) return;
00263
00264 StreamState* streamState = (StreamState*)streamToken;
00265 if (streamState != NULL && streamState->mediaSource() != NULL) {
00266 setStreamSourceScale(streamState->mediaSource(), scale);
00267 }
00268 }
00269
00270 float OnDemandServerMediaSubsession::getCurrentNPT(void* streamToken) {
00271 do {
00272 if (streamToken == NULL) break;
00273
00274 StreamState* streamState = (StreamState*)streamToken;
00275 RTPSink* rtpSink = streamState->rtpSink();
00276 if (rtpSink == NULL) break;
00277
00278 return streamState->startNPT()
00279 + (rtpSink->mostRecentPresentationTime().tv_sec - rtpSink->initialPresentationTime().tv_sec)
00280 + (rtpSink->mostRecentPresentationTime().tv_sec - rtpSink->initialPresentationTime().tv_sec)/1000000.0f;
00281 } while (0);
00282
00283 return 0.0;
00284 }
00285
00286 FramedSource* OnDemandServerMediaSubsession::getStreamSource(void* streamToken) {
00287 if (streamToken == NULL) return NULL;
00288
00289 StreamState* streamState = (StreamState*)streamToken;
00290 return streamState->mediaSource();
00291 }
00292
00293 void OnDemandServerMediaSubsession::deleteStream(unsigned clientSessionId,
00294 void*& streamToken) {
00295 StreamState* streamState = (StreamState*)streamToken;
00296
00297
00298 Destinations* destinations
00299 = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
00300 if (destinations != NULL) {
00301 fDestinationsHashTable->Remove((char const*)clientSessionId);
00302
00303
00304 if (streamState != NULL) streamState->endPlaying(destinations);
00305 }
00306
00307
00308 if (streamState != NULL) {
00309 if (streamState->referenceCount() > 0) --streamState->referenceCount();
00310 if (streamState->referenceCount() == 0) {
00311 delete streamState;
00312 streamToken = NULL;
00313 }
00314 }
00315
00316
00317 delete destinations;
00318 }
00319
00320 char const* OnDemandServerMediaSubsession
00321 ::getAuxSDPLine(RTPSink* rtpSink, FramedSource* ) {
00322
00323 return rtpSink == NULL ? NULL : rtpSink->auxSDPLine();
00324 }
00325
00326 void OnDemandServerMediaSubsession::seekStreamSource(FramedSource* ,
00327 double& , double , u_int64_t& numBytes) {
00328
00329 }
00330
00331 void OnDemandServerMediaSubsession::seekStreamSource(FramedSource* ,
00332 char*& absStart, char*& absEnd) {
00333
00334 delete[] absStart; absStart = NULL;
00335 delete[] absEnd; absEnd = NULL;
00336 }
00337
00338 void OnDemandServerMediaSubsession
00339 ::setStreamSourceScale(FramedSource* , float ) {
00340
00341 }
00342
00343 void OnDemandServerMediaSubsession::closeStreamSource(FramedSource *inputSource) {
00344 Medium::close(inputSource);
00345 }
00346
00347 void OnDemandServerMediaSubsession
00348 ::setSDPLinesFromRTPSink(RTPSink* rtpSink, FramedSource* inputSource, unsigned estBitrate) {
00349 if (rtpSink == NULL) return;
00350
00351 char const* mediaType = rtpSink->sdpMediaType();
00352 unsigned char rtpPayloadType = rtpSink->rtpPayloadType();
00353 AddressString ipAddressStr(fServerAddressForSDP);
00354 char* rtpmapLine = rtpSink->rtpmapLine();
00355 char const* rangeLine = rangeSDPLine();
00356 char const* auxSDPLine = getAuxSDPLine(rtpSink, inputSource);
00357 if (auxSDPLine == NULL) auxSDPLine = "";
00358
00359 char const* const sdpFmt =
00360 "m=%s %u RTP/AVP %d\r\n"
00361 "c=IN IP4 %s\r\n"
00362 "b=AS:%u\r\n"
00363 "%s"
00364 "%s"
00365 "%s"
00366 "a=control:%s\r\n";
00367 unsigned sdpFmtSize = strlen(sdpFmt)
00368 + strlen(mediaType) + 5 + 3
00369 + strlen(ipAddressStr.val())
00370 + 20
00371 + strlen(rtpmapLine)
00372 + strlen(rangeLine)
00373 + strlen(auxSDPLine)
00374 + strlen(trackId());
00375 char* sdpLines = new char[sdpFmtSize];
00376 sprintf(sdpLines, sdpFmt,
00377 mediaType,
00378 fPortNumForSDP,
00379 rtpPayloadType,
00380 ipAddressStr.val(),
00381 estBitrate,
00382 rtpmapLine,
00383 rangeLine,
00384 auxSDPLine,
00385 trackId());
00386 delete[] (char*)rangeLine; delete[] rtpmapLine;
00387
00388 fSDPLines = strDup(sdpLines);
00389 delete[] sdpLines;
00390 }
00391
00392
00394
00395 static void afterPlayingStreamState(void* clientData) {
00396 StreamState* streamState = (StreamState*)clientData;
00397 if (streamState->streamDuration() == 0.0) {
00398
00399
00400
00401
00402 streamState->reclaim();
00403 }
00404
00405
00406
00407 }
00408
00409 StreamState::StreamState(OnDemandServerMediaSubsession& master,
00410 Port const& serverRTPPort, Port const& serverRTCPPort,
00411 RTPSink* rtpSink, BasicUDPSink* udpSink,
00412 unsigned totalBW, FramedSource* mediaSource,
00413 Groupsock* rtpGS, Groupsock* rtcpGS)
00414 : fMaster(master), fAreCurrentlyPlaying(False), fReferenceCount(1),
00415 fServerRTPPort(serverRTPPort), fServerRTCPPort(serverRTCPPort),
00416 fRTPSink(rtpSink), fUDPSink(udpSink), fStreamDuration(master.duration()),
00417 fTotalBW(totalBW), fRTCPInstance(NULL) ,
00418 fMediaSource(mediaSource), fStartNPT(0.0), fRTPgs(rtpGS), fRTCPgs(rtcpGS) {
00419 }
00420
00421 StreamState::~StreamState() {
00422 reclaim();
00423 }
00424
00425 void StreamState
00426 ::startPlaying(Destinations* dests,
00427 TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData,
00428 ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
00429 void* serverRequestAlternativeByteHandlerClientData) {
00430 if (dests == NULL) return;
00431
00432 if (fRTCPInstance == NULL && fRTPSink != NULL) {
00433
00434 fRTCPInstance
00435 = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,
00436 fTotalBW, (unsigned char*)fMaster.fCNAME,
00437 fRTPSink, NULL );
00438
00439 }
00440
00441 if (dests->isTCP) {
00442
00443 if (fRTPSink != NULL) {
00444 fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00445 fRTPSink->setServerRequestAlternativeByteHandler(dests->tcpSocketNum, serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
00446 }
00447 if (fRTCPInstance != NULL) {
00448 fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00449 fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
00450 rtcpRRHandler, rtcpRRHandlerClientData);
00451 }
00452 } else {
00453
00454
00455 if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
00456 if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
00457 if (fRTCPInstance != NULL) {
00458 fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
00459 rtcpRRHandler, rtcpRRHandlerClientData);
00460 }
00461 }
00462
00463 if (fRTCPInstance != NULL) {
00464
00465
00466 fRTCPInstance->sendReport();
00467 }
00468
00469 if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
00470 if (fRTPSink != NULL) {
00471 fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00472 fAreCurrentlyPlaying = True;
00473 } else if (fUDPSink != NULL) {
00474 fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
00475 fAreCurrentlyPlaying = True;
00476 }
00477 }
00478 }
00479
00480 void StreamState::pause() {
00481 if (fRTPSink != NULL) fRTPSink->stopPlaying();
00482 if (fUDPSink != NULL) fUDPSink->stopPlaying();
00483 fAreCurrentlyPlaying = False;
00484 }
00485
00486 void StreamState::endPlaying(Destinations* dests) {
00487 if (fRTCPInstance != NULL) {
00488
00489
00490 fRTCPInstance->sendBYE();
00491 }
00492
00493 if (dests->isTCP) {
00494 if (fRTPSink != NULL) {
00495 fRTPSink->removeStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
00496 }
00497 if (fRTCPInstance != NULL) {
00498 fRTCPInstance->removeStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
00499 fRTCPInstance->unsetSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId);
00500 }
00501 } else {
00502
00503 if (fRTPgs != NULL) fRTPgs->removeDestination(dests->addr, dests->rtpPort);
00504 if (fRTCPgs != NULL) fRTCPgs->removeDestination(dests->addr, dests->rtcpPort);
00505 if (fRTCPInstance != NULL) {
00506 fRTCPInstance->unsetSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort);
00507 }
00508 }
00509 }
00510
00511 void StreamState::reclaim() {
00512
00513 Medium::close(fRTCPInstance) ; fRTCPInstance = NULL;
00514 Medium::close(fRTPSink); fRTPSink = NULL;
00515 Medium::close(fUDPSink); fUDPSink = NULL;
00516
00517 fMaster.closeStreamSource(fMediaSource); fMediaSource = NULL;
00518 if (fMaster.fLastStreamToken == this) fMaster.fLastStreamToken = NULL;
00519
00520 delete fRTPgs; fRTPgs = NULL;
00521 delete fRTCPgs; fRTCPgs = NULL;
00522 }