00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "RTCP.hh"
00022 #include "GroupsockHelper.hh"
00023 #include "rtcp_from_spec.h"
00024
00026
00027 class RTCPMemberDatabase {
00028 public:
00029 RTCPMemberDatabase(RTCPInstance& ourRTCPInstance)
00030 : fOurRTCPInstance(ourRTCPInstance), fNumMembers(1 ),
00031 fTable(HashTable::create(ONE_WORD_HASH_KEYS)) {
00032 }
00033
00034 virtual ~RTCPMemberDatabase() {
00035 delete fTable;
00036 }
00037
00038 Boolean isMember(unsigned ssrc) const {
00039 return fTable->Lookup((char*)(long)ssrc) != NULL;
00040 }
00041
00042 Boolean noteMembership(unsigned ssrc, unsigned curTimeCount) {
00043 Boolean isNew = !isMember(ssrc);
00044
00045 if (isNew) {
00046 ++fNumMembers;
00047 }
00048
00049
00050 fTable->Add((char*)(long)ssrc, (void*)(long)curTimeCount);
00051
00052 return isNew;
00053 }
00054
00055 Boolean remove(unsigned ssrc) {
00056 Boolean wasPresent = fTable->Remove((char*)(long)ssrc);
00057 if (wasPresent) {
00058 --fNumMembers;
00059 }
00060 return wasPresent;
00061 }
00062
00063 unsigned numMembers() const {
00064 return fNumMembers;
00065 }
00066
00067 void reapOldMembers(unsigned threshold);
00068
00069 private:
00070 RTCPInstance& fOurRTCPInstance;
00071 unsigned fNumMembers;
00072 HashTable* fTable;
00073 };
00074
00075 void RTCPMemberDatabase::reapOldMembers(unsigned threshold) {
00076 Boolean foundOldMember;
00077 u_int32_t oldSSRC = 0;
00078
00079 do {
00080 foundOldMember = False;
00081
00082 HashTable::Iterator* iter
00083 = HashTable::Iterator::create(*fTable);
00084 uintptr_t timeCount;
00085 char const* key;
00086 while ((timeCount = (uintptr_t)(iter->next(key))) != 0) {
00087 #ifdef DEBUG
00088 fprintf(stderr, "reap: checking SSRC 0x%lx: %ld (threshold %d)\n", (unsigned long)key, timeCount, threshold);
00089 #endif
00090 if (timeCount < (uintptr_t)threshold) {
00091 uintptr_t ssrc = (uintptr_t)key;
00092 oldSSRC = (u_int32_t)ssrc;
00093 foundOldMember = True;
00094 }
00095 }
00096 delete iter;
00097
00098 if (foundOldMember) {
00099 #ifdef DEBUG
00100 fprintf(stderr, "reap: removing SSRC 0x%x\n", oldSSRC);
00101 #endif
00102 fOurRTCPInstance.removeSSRC(oldSSRC, True);
00103 }
00104 } while (foundOldMember);
00105 }
00106
00107
00109
00110 static double dTimeNow() {
00111 struct timeval timeNow;
00112 gettimeofday(&timeNow, NULL);
00113 return (double) (timeNow.tv_sec + timeNow.tv_usec/1000000.0);
00114 }
00115
00116 static unsigned const maxRTCPPacketSize = 1450;
00117
00118 static unsigned const preferredPacketSize = 1000;
00119
00120 RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs,
00121 unsigned totSessionBW,
00122 unsigned char const* cname,
00123 RTPSink* sink, RTPSource const* source,
00124 Boolean isSSMSource)
00125 : Medium(env), fRTCPInterface(this, RTCPgs), fTotSessionBW(totSessionBW),
00126 fSink(sink), fSource(source), fIsSSMSource(isSSMSource),
00127 fCNAME(RTCP_SDES_CNAME, cname), fOutgoingReportCount(1),
00128 fAveRTCPSize(0), fIsInitial(1), fPrevNumMembers(0),
00129 fLastSentSize(0), fLastReceivedSize(0), fLastReceivedSSRC(0),
00130 fTypeOfEvent(EVENT_UNKNOWN), fTypeOfPacket(PACKET_UNKNOWN_TYPE),
00131 fHaveJustSentPacket(False), fLastPacketSentSize(0),
00132 fByeHandlerTask(NULL), fByeHandlerClientData(NULL),
00133 fSRHandlerTask(NULL), fSRHandlerClientData(NULL),
00134 fRRHandlerTask(NULL), fRRHandlerClientData(NULL),
00135 fSpecificRRHandlerTable(NULL) {
00136 #ifdef DEBUG
00137 fprintf(stderr, "RTCPInstance[%p]::RTCPInstance()\n", this);
00138 #endif
00139 if (fTotSessionBW == 0) {
00140 env << "RTCPInstance::RTCPInstance error: totSessionBW parameter should not be zero!\n";
00141 fTotSessionBW = 1;
00142 }
00143
00144 if (isSSMSource) RTCPgs->multicastSendOnly();
00145
00146 double timeNow = dTimeNow();
00147 fPrevReportTime = fNextReportTime = timeNow;
00148
00149 fKnownMembers = new RTCPMemberDatabase(*this);
00150 fInBuf = new unsigned char[maxRTCPPacketSize];
00151 if (fKnownMembers == NULL || fInBuf == NULL) return;
00152 fNumBytesAlreadyRead = 0;
00153
00154
00155 unsigned savedMaxSize = OutPacketBuffer::maxSize;
00156 OutPacketBuffer::maxSize = maxRTCPPacketSize;
00157 fOutBuf = new OutPacketBuffer(preferredPacketSize, maxRTCPPacketSize);
00158 OutPacketBuffer::maxSize = savedMaxSize;
00159 if (fOutBuf == NULL) return;
00160
00161
00162 TaskScheduler::BackgroundHandlerProc* handler
00163 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00164 fRTCPInterface.startNetworkReading(handler);
00165
00166
00167 fTypeOfEvent = EVENT_REPORT;
00168 onExpire(this);
00169 }
00170
00171 struct RRHandlerRecord {
00172 TaskFunc* rrHandlerTask;
00173 void* rrHandlerClientData;
00174 };
00175
00176 RTCPInstance::~RTCPInstance() {
00177 #ifdef DEBUG
00178 fprintf(stderr, "RTCPInstance[%p]::~RTCPInstance()\n", this);
00179 #endif
00180
00181 fRTCPInterface.stopNetworkReading();
00182
00183
00184
00185 fTypeOfEvent = EVENT_BYE;
00186 sendBYE();
00187
00188 if (fSpecificRRHandlerTable != NULL) {
00189 AddressPortLookupTable::Iterator iter(*fSpecificRRHandlerTable);
00190 RRHandlerRecord* rrHandler;
00191 while ((rrHandler = (RRHandlerRecord*)iter.next()) != NULL) {
00192 delete rrHandler;
00193 }
00194 delete fSpecificRRHandlerTable;
00195 }
00196
00197 delete fKnownMembers;
00198 delete fOutBuf;
00199 delete[] fInBuf;
00200 }
00201
00202 RTCPInstance* RTCPInstance::createNew(UsageEnvironment& env, Groupsock* RTCPgs,
00203 unsigned totSessionBW,
00204 unsigned char const* cname,
00205 RTPSink* sink, RTPSource const* source,
00206 Boolean isSSMSource) {
00207 return new RTCPInstance(env, RTCPgs, totSessionBW, cname, sink, source,
00208 isSSMSource);
00209 }
00210
00211 Boolean RTCPInstance::lookupByName(UsageEnvironment& env,
00212 char const* instanceName,
00213 RTCPInstance*& resultInstance) {
00214 resultInstance = NULL;
00215
00216 Medium* medium;
00217 if (!Medium::lookupByName(env, instanceName, medium)) return False;
00218
00219 if (!medium->isRTCPInstance()) {
00220 env.setResultMsg(instanceName, " is not a RTCP instance");
00221 return False;
00222 }
00223
00224 resultInstance = (RTCPInstance*)medium;
00225 return True;
00226 }
00227
00228 Boolean RTCPInstance::isRTCPInstance() const {
00229 return True;
00230 }
00231
00232 unsigned RTCPInstance::numMembers() const {
00233 if (fKnownMembers == NULL) return 0;
00234
00235 return fKnownMembers->numMembers();
00236 }
00237
00238 void RTCPInstance::setByeHandler(TaskFunc* handlerTask, void* clientData,
00239 Boolean handleActiveParticipantsOnly) {
00240 fByeHandlerTask = handlerTask;
00241 fByeHandlerClientData = clientData;
00242 fByeHandleActiveParticipantsOnly = handleActiveParticipantsOnly;
00243 }
00244
00245 void RTCPInstance::setSRHandler(TaskFunc* handlerTask, void* clientData) {
00246 fSRHandlerTask = handlerTask;
00247 fSRHandlerClientData = clientData;
00248 }
00249
00250 void RTCPInstance::setRRHandler(TaskFunc* handlerTask, void* clientData) {
00251 fRRHandlerTask = handlerTask;
00252 fRRHandlerClientData = clientData;
00253 }
00254
00255 void RTCPInstance
00256 ::setSpecificRRHandler(netAddressBits fromAddress, Port fromPort,
00257 TaskFunc* handlerTask, void* clientData) {
00258 if (handlerTask == NULL && clientData == NULL) {
00259 unsetSpecificRRHandler(fromAddress, fromPort);
00260 return;
00261 }
00262
00263 RRHandlerRecord* rrHandler = new RRHandlerRecord;
00264 rrHandler->rrHandlerTask = handlerTask;
00265 rrHandler->rrHandlerClientData = clientData;
00266 if (fSpecificRRHandlerTable == NULL) {
00267 fSpecificRRHandlerTable = new AddressPortLookupTable;
00268 }
00269 fSpecificRRHandlerTable->Add(fromAddress, (~0), fromPort, rrHandler);
00270 }
00271
00272 void RTCPInstance
00273 ::unsetSpecificRRHandler(netAddressBits fromAddress, Port fromPort) {
00274 if (fSpecificRRHandlerTable == NULL) return;
00275
00276 RRHandlerRecord* rrHandler
00277 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddress, (~0), fromPort));
00278 if (rrHandler != NULL) {
00279 fSpecificRRHandlerTable->Remove(fromAddress, (~0), fromPort);
00280 delete rrHandler;
00281 }
00282 }
00283
00284 void RTCPInstance::setStreamSocket(int sockNum,
00285 unsigned char streamChannelId) {
00286
00287 fRTCPInterface.stopNetworkReading();
00288
00289
00290 fRTCPInterface.setStreamSocket(sockNum, streamChannelId);
00291
00292
00293 TaskScheduler::BackgroundHandlerProc* handler
00294 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00295 fRTCPInterface.startNetworkReading(handler);
00296 }
00297
00298 void RTCPInstance::addStreamSocket(int sockNum,
00299 unsigned char streamChannelId) {
00300
00301 envir().taskScheduler().turnOffBackgroundReadHandling(fRTCPInterface.gs()->socketNum());
00302
00303
00304 fRTCPInterface.addStreamSocket(sockNum, streamChannelId);
00305
00306
00307 TaskScheduler::BackgroundHandlerProc* handler
00308 = (TaskScheduler::BackgroundHandlerProc*)&incomingReportHandler;
00309 fRTCPInterface.startNetworkReading(handler);
00310 }
00311
00312 static unsigned const IP_UDP_HDR_SIZE = 28;
00313
00314
00315 #define ADVANCE(n) pkt += (n); packetSize -= (n)
00316
00317 void RTCPInstance::incomingReportHandler(RTCPInstance* instance,
00318 int ) {
00319 instance->incomingReportHandler1();
00320 }
00321
00322 void RTCPInstance::incomingReportHandler1() {
00323 do {
00324 Boolean callByeHandler = False;
00325 int tcpReadStreamSocketNum = fRTCPInterface.nextTCPReadStreamSocketNum();
00326 unsigned char tcpReadStreamChannelId = fRTCPInterface.nextTCPReadStreamChannelId();
00327 unsigned packetSize = 0;
00328 unsigned numBytesRead;
00329 struct sockaddr_in fromAddress;
00330 Boolean packetReadWasIncomplete;
00331 if (fNumBytesAlreadyRead >= maxRTCPPacketSize) {
00332 envir() << "RTCPInstance error: Hit limit when reading incoming packet over TCP. Increase \"maxRTCPPacketSize\"\n";
00333 break;
00334 }
00335 Boolean readResult
00336 = fRTCPInterface.handleRead(&fInBuf[fNumBytesAlreadyRead], maxRTCPPacketSize - fNumBytesAlreadyRead,
00337 numBytesRead, fromAddress, packetReadWasIncomplete);
00338 if (packetReadWasIncomplete) {
00339 fNumBytesAlreadyRead += numBytesRead;
00340 return;
00341 } else {
00342 packetSize = fNumBytesAlreadyRead + numBytesRead;
00343 fNumBytesAlreadyRead = 0;
00344 }
00345 if (!readResult) break;
00346
00347
00348 Boolean packetWasFromOurHost = False;
00349 if (RTCPgs()->wasLoopedBackFromUs(envir(), fromAddress)) {
00350 packetWasFromOurHost = True;
00351
00352
00353
00354
00355
00356 if (fHaveJustSentPacket && fLastPacketSentSize == packetSize) {
00357
00358 fHaveJustSentPacket = False;
00359 break;
00360 }
00361 }
00362
00363 unsigned char* pkt = fInBuf;
00364 if (fIsSSMSource && !packetWasFromOurHost) {
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380 fRTCPInterface.sendPacket(pkt, packetSize);
00381 fHaveJustSentPacket = True;
00382 fLastPacketSentSize = packetSize;
00383 }
00384
00385 #ifdef DEBUG
00386 fprintf(stderr, "[%p]saw incoming RTCP packet", this);
00387 if (tcpReadStreamSocketNum < 0) {
00388
00389 fprintf(stderr, " (from address %s, port %d)", AddressString(fromAddress).val(), ntohs(fromAddress.sin_port));
00390 }
00391 fprintf(stderr, "\n");
00392 for (unsigned i = 0; i < packetSize; ++i) {
00393 if (i%4 == 0) fprintf(stderr, " ");
00394 fprintf(stderr, "%02x", pkt[i]);
00395 }
00396 fprintf(stderr, "\n");
00397 #endif
00398 int totPacketSize = IP_UDP_HDR_SIZE + packetSize;
00399
00400
00401
00402
00403
00404 if (packetSize < 4) break;
00405 unsigned rtcpHdr = ntohl(*(u_int32_t*)pkt);
00406 if ((rtcpHdr & 0xE0FE0000) != (0x80000000 | (RTCP_PT_SR<<16))) {
00407 #ifdef DEBUG
00408 fprintf(stderr, "rejected bad RTCP packet: header 0x%08x\n", rtcpHdr);
00409 #endif
00410 break;
00411 }
00412
00413
00414
00415 int typeOfPacket = PACKET_UNKNOWN_TYPE;
00416 unsigned reportSenderSSRC = 0;
00417 Boolean packetOK = False;
00418 while (1) {
00419 unsigned rc = (rtcpHdr>>24)&0x1F;
00420 unsigned pt = (rtcpHdr>>16)&0xFF;
00421 unsigned length = 4*(rtcpHdr&0xFFFF);
00422 ADVANCE(4);
00423 if (length > packetSize) break;
00424
00425
00426 if (length < 4) break; length -= 4;
00427 reportSenderSSRC = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00428
00429 Boolean subPacketOK = False;
00430 switch (pt) {
00431 case RTCP_PT_SR: {
00432 #ifdef DEBUG
00433 fprintf(stderr, "SR\n");
00434 #endif
00435 if (length < 20) break; length -= 20;
00436
00437
00438 unsigned NTPmsw = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00439 unsigned NTPlsw = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00440 unsigned rtpTimestamp = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00441 if (fSource != NULL) {
00442 RTPReceptionStatsDB& receptionStats
00443 = fSource->receptionStatsDB();
00444 receptionStats.noteIncomingSR(reportSenderSSRC,
00445 NTPmsw, NTPlsw, rtpTimestamp);
00446 }
00447 ADVANCE(8);
00448
00449
00450 if (fSRHandlerTask != NULL) (*fSRHandlerTask)(fSRHandlerClientData);
00451
00452
00453 }
00454 case RTCP_PT_RR: {
00455 #ifdef DEBUG
00456 fprintf(stderr, "RR\n");
00457 #endif
00458 unsigned reportBlocksSize = rc*(6*4);
00459 if (length < reportBlocksSize) break;
00460 length -= reportBlocksSize;
00461
00462 if (fSink != NULL) {
00463
00464 RTPTransmissionStatsDB& transmissionStats = fSink->transmissionStatsDB();
00465 for (unsigned i = 0; i < rc; ++i) {
00466 unsigned senderSSRC = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00467
00468 if (senderSSRC == fSink->SSRC()) {
00469 unsigned lossStats = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00470 unsigned highestReceived = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00471 unsigned jitter = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00472 unsigned timeLastSR = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00473 unsigned timeSinceLastSR = ntohl(*(u_int32_t*)pkt); ADVANCE(4);
00474 transmissionStats.noteIncomingRR(reportSenderSSRC, fromAddress,
00475 lossStats,
00476 highestReceived, jitter,
00477 timeLastSR, timeSinceLastSR);
00478 } else {
00479 ADVANCE(4*5);
00480 }
00481 }
00482 } else {
00483 ADVANCE(reportBlocksSize);
00484 }
00485
00486 if (pt == RTCP_PT_RR) {
00487
00488
00489
00490 if (fSpecificRRHandlerTable != NULL) {
00491 netAddressBits fromAddr;
00492 portNumBits fromPortNum;
00493 if (tcpReadStreamSocketNum < 0) {
00494
00495 fromAddr = fromAddress.sin_addr.s_addr;
00496 fromPortNum = ntohs(fromAddress.sin_port);
00497 } else {
00498
00499
00500 fromAddr = tcpReadStreamSocketNum;
00501 fromPortNum = tcpReadStreamChannelId;
00502 }
00503 Port fromPort(fromPortNum);
00504 RRHandlerRecord* rrHandler
00505 = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, (~0), fromPort));
00506 if (rrHandler != NULL) {
00507 if (rrHandler->rrHandlerTask != NULL) {
00508 (*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData);
00509 }
00510 }
00511 }
00512
00513
00514 if (fRRHandlerTask != NULL) (*fRRHandlerTask)(fRRHandlerClientData);
00515 }
00516
00517 subPacketOK = True;
00518 typeOfPacket = PACKET_RTCP_REPORT;
00519 break;
00520 }
00521 case RTCP_PT_BYE: {
00522 #ifdef DEBUG
00523 fprintf(stderr, "BYE\n");
00524 #endif
00525
00526
00527 if (fByeHandlerTask != NULL
00528 && (!fByeHandleActiveParticipantsOnly
00529 || (fSource != NULL
00530 && fSource->receptionStatsDB().lookup(reportSenderSSRC) != NULL)
00531 || (fSink != NULL
00532 && fSink->transmissionStatsDB().lookup(reportSenderSSRC) != NULL))) {
00533 callByeHandler = True;
00534 }
00535
00536
00537
00538 subPacketOK = True;
00539 typeOfPacket = PACKET_BYE;
00540 break;
00541 }
00542
00543 default:
00544 #ifdef DEBUG
00545 fprintf(stderr, "UNSUPPORTED TYPE(0x%x)\n", pt);
00546 #endif
00547 subPacketOK = True;
00548 break;
00549 }
00550 if (!subPacketOK) break;
00551
00552
00553
00554 #ifdef DEBUG
00555 fprintf(stderr, "validated RTCP subpacket (type %d): %d, %d, %d, 0x%08x\n", typeOfPacket, rc, pt, length, reportSenderSSRC);
00556 #endif
00557
00558
00559 ADVANCE(length);
00560
00561
00562 if (packetSize == 0) {
00563 packetOK = True;
00564 break;
00565 } else if (packetSize < 4) {
00566 #ifdef DEBUG
00567 fprintf(stderr, "extraneous %d bytes at end of RTCP packet!\n", packetSize);
00568 #endif
00569 break;
00570 }
00571 rtcpHdr = ntohl(*(u_int32_t*)pkt);
00572 if ((rtcpHdr & 0xC0000000) != 0x80000000) {
00573 #ifdef DEBUG
00574 fprintf(stderr, "bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00575 #endif
00576 break;
00577 }
00578 }
00579
00580 if (!packetOK) {
00581 #ifdef DEBUG
00582 fprintf(stderr, "rejected bad RTCP subpacket: header 0x%08x\n", rtcpHdr);
00583 #endif
00584 break;
00585 } else {
00586 #ifdef DEBUG
00587 fprintf(stderr, "validated entire RTCP packet\n");
00588 #endif
00589 }
00590
00591 onReceive(typeOfPacket, totPacketSize, reportSenderSSRC);
00592
00593
00594 if (callByeHandler && fByeHandlerTask != NULL) {
00595 TaskFunc* byeHandler = fByeHandlerTask;
00596 fByeHandlerTask = NULL;
00597 (*byeHandler)(fByeHandlerClientData);
00598 }
00599 } while (0);
00600 }
00601
00602 void RTCPInstance::onReceive(int typeOfPacket, int totPacketSize,
00603 unsigned ssrc) {
00604 fTypeOfPacket = typeOfPacket;
00605 fLastReceivedSize = totPacketSize;
00606 fLastReceivedSSRC = ssrc;
00607
00608 int members = (int)numMembers();
00609 int senders = (fSink != NULL) ? 1 : 0;
00610
00611 OnReceive(this,
00612 this,
00613 &members,
00614 &fPrevNumMembers,
00615 &senders,
00616 &fAveRTCPSize,
00617 &fPrevReportTime,
00618 dTimeNow(),
00619 fNextReportTime);
00620 }
00621
00622 void RTCPInstance::sendReport() {
00623 #ifdef DEBUG
00624 fprintf(stderr, "sending REPORT\n");
00625 #endif
00626
00627 if (!addReport()) return;
00628
00629
00630 addSDES();
00631
00632
00633 sendBuiltPacket();
00634
00635
00636 const unsigned membershipReapPeriod = 5;
00637 if ((++fOutgoingReportCount) % membershipReapPeriod == 0) {
00638 unsigned threshold = fOutgoingReportCount - membershipReapPeriod;
00639 fKnownMembers->reapOldMembers(threshold);
00640 }
00641 }
00642
00643 void RTCPInstance::sendBYE() {
00644 #ifdef DEBUG
00645 fprintf(stderr, "sending BYE\n");
00646 #endif
00647
00648 (void)addReport(True);
00649
00650 addBYE();
00651 sendBuiltPacket();
00652 }
00653
00654 void RTCPInstance::sendBuiltPacket() {
00655 #ifdef DEBUG
00656 fprintf(stderr, "sending RTCP packet\n");
00657 unsigned char* p = fOutBuf->packet();
00658 for (unsigned i = 0; i < fOutBuf->curPacketSize(); ++i) {
00659 if (i%4 == 0) fprintf(stderr," ");
00660 fprintf(stderr, "%02x", p[i]);
00661 }
00662 fprintf(stderr, "\n");
00663 #endif
00664 unsigned reportSize = fOutBuf->curPacketSize();
00665 fRTCPInterface.sendPacket(fOutBuf->packet(), reportSize);
00666 fOutBuf->resetOffset();
00667
00668 fLastSentSize = IP_UDP_HDR_SIZE + reportSize;
00669 fHaveJustSentPacket = True;
00670 fLastPacketSentSize = reportSize;
00671 }
00672
00673 int RTCPInstance::checkNewSSRC() {
00674 return fKnownMembers->noteMembership(fLastReceivedSSRC,
00675 fOutgoingReportCount);
00676 }
00677
00678 void RTCPInstance::removeLastReceivedSSRC() {
00679 removeSSRC(fLastReceivedSSRC, False);
00680 }
00681
00682 void RTCPInstance::removeSSRC(u_int32_t ssrc, Boolean alsoRemoveStats) {
00683 fKnownMembers->remove(ssrc);
00684
00685 if (alsoRemoveStats) {
00686
00687 if (fSource != NULL) fSource->receptionStatsDB().removeRecord(ssrc);
00688 if (fSink != NULL) fSink->transmissionStatsDB().removeRecord(ssrc);
00689 }
00690 }
00691
00692 void RTCPInstance::onExpire(RTCPInstance* instance) {
00693 instance->onExpire1();
00694 }
00695
00696
00697
00698 Boolean RTCPInstance::addReport(Boolean alwaysAdd) {
00699
00700 if (fSink != NULL) {
00701 if (!alwaysAdd) {
00702 if (!fSink->enableRTCPReports()) return False;
00703
00704
00705
00706
00707 if (fSink->nextTimestampHasBeenPreset()) return False;
00708 }
00709
00710 addSR();
00711 } else if (fSource != NULL) {
00712 addRR();
00713 }
00714
00715 return True;
00716 }
00717
00718 void RTCPInstance::addSR() {
00719
00720
00721 enqueueCommonReportPrefix(RTCP_PT_SR, fSink->SSRC(),
00722 5 );
00723
00724
00725
00726
00727 struct timeval timeNow;
00728 gettimeofday(&timeNow, NULL);
00729 fOutBuf->enqueueWord(timeNow.tv_sec + 0x83AA7E80);
00730
00731 double fractionalPart = (timeNow.tv_usec/15625.0)*0x04000000;
00732 fOutBuf->enqueueWord((unsigned)(fractionalPart+0.5));
00733
00734 unsigned rtpTimestamp = fSink->convertToRTPTimestamp(timeNow);
00735 fOutBuf->enqueueWord(rtpTimestamp);
00736
00737
00738 fOutBuf->enqueueWord(fSink->packetCount());
00739 fOutBuf->enqueueWord(fSink->octetCount());
00740
00741 enqueueCommonReportSuffix();
00742 }
00743
00744 void RTCPInstance::addRR() {
00745
00746
00747 enqueueCommonReportPrefix(RTCP_PT_RR, fSource->SSRC());
00748 enqueueCommonReportSuffix();
00749 }
00750
00751 void RTCPInstance::enqueueCommonReportPrefix(unsigned char packetType,
00752 unsigned SSRC,
00753 unsigned numExtraWords) {
00754 unsigned numReportingSources;
00755 if (fSource == NULL) {
00756 numReportingSources = 0;
00757 } else {
00758 RTPReceptionStatsDB& allReceptionStats
00759 = fSource->receptionStatsDB();
00760 numReportingSources = allReceptionStats.numActiveSourcesSinceLastReset();
00761
00762 if (numReportingSources >= 32) { numReportingSources = 32; }
00763
00764 }
00765
00766 unsigned rtcpHdr = 0x80000000;
00767 rtcpHdr |= (numReportingSources<<24);
00768 rtcpHdr |= (packetType<<16);
00769 rtcpHdr |= (1 + numExtraWords + 6*numReportingSources);
00770
00771 fOutBuf->enqueueWord(rtcpHdr);
00772
00773 fOutBuf->enqueueWord(SSRC);
00774 }
00775
00776 void RTCPInstance::enqueueCommonReportSuffix() {
00777
00778 if (fSource != NULL) {
00779 RTPReceptionStatsDB& allReceptionStats
00780 = fSource->receptionStatsDB();
00781
00782 RTPReceptionStatsDB::Iterator iterator(allReceptionStats);
00783 while (1) {
00784 RTPReceptionStats* receptionStats = iterator.next();
00785 if (receptionStats == NULL) break;
00786 enqueueReportBlock(receptionStats);
00787 }
00788
00789 allReceptionStats.reset();
00790 }
00791 }
00792
00793 void
00794 RTCPInstance::enqueueReportBlock(RTPReceptionStats* stats) {
00795 fOutBuf->enqueueWord(stats->SSRC());
00796
00797 unsigned highestExtSeqNumReceived = stats->highestExtSeqNumReceived();
00798
00799 unsigned totNumExpected
00800 = highestExtSeqNumReceived - stats->baseExtSeqNumReceived();
00801 int totNumLost = totNumExpected - stats->totNumPacketsReceived();
00802
00803 if (totNumLost > 0x007FFFFF) {
00804 totNumLost = 0x007FFFFF;
00805 } else if (totNumLost < 0) {
00806 if (totNumLost < -0x00800000) totNumLost = 0x00800000;
00807 totNumLost &= 0x00FFFFFF;
00808 }
00809
00810 unsigned numExpectedSinceLastReset
00811 = highestExtSeqNumReceived - stats->lastResetExtSeqNumReceived();
00812 int numLostSinceLastReset
00813 = numExpectedSinceLastReset - stats->numPacketsReceivedSinceLastReset();
00814 unsigned char lossFraction;
00815 if (numExpectedSinceLastReset == 0 || numLostSinceLastReset < 0) {
00816 lossFraction = 0;
00817 } else {
00818 lossFraction = (unsigned char)
00819 ((numLostSinceLastReset << 8) / numExpectedSinceLastReset);
00820 }
00821
00822 fOutBuf->enqueueWord((lossFraction<<24) | totNumLost);
00823 fOutBuf->enqueueWord(highestExtSeqNumReceived);
00824
00825 fOutBuf->enqueueWord(stats->jitter());
00826
00827 unsigned NTPmsw = stats->lastReceivedSR_NTPmsw();
00828 unsigned NTPlsw = stats->lastReceivedSR_NTPlsw();
00829 unsigned LSR = ((NTPmsw&0xFFFF)<<16)|(NTPlsw>>16);
00830 fOutBuf->enqueueWord(LSR);
00831
00832
00833 struct timeval const& LSRtime = stats->lastReceivedSR_time();
00834 struct timeval timeNow, timeSinceLSR;
00835 gettimeofday(&timeNow, NULL);
00836 if (timeNow.tv_usec < LSRtime.tv_usec) {
00837 timeNow.tv_usec += 1000000;
00838 timeNow.tv_sec -= 1;
00839 }
00840 timeSinceLSR.tv_sec = timeNow.tv_sec - LSRtime.tv_sec;
00841 timeSinceLSR.tv_usec = timeNow.tv_usec - LSRtime.tv_usec;
00842
00843
00844 unsigned DLSR;
00845 if (LSR == 0) {
00846 DLSR = 0;
00847 } else {
00848 DLSR = (timeSinceLSR.tv_sec<<16)
00849 | ( (((timeSinceLSR.tv_usec<<11)+15625)/31250) & 0xFFFF);
00850 }
00851 fOutBuf->enqueueWord(DLSR);
00852 }
00853
00854 void RTCPInstance::addSDES() {
00855
00856
00857
00858 unsigned numBytes = 4;
00859
00860 numBytes += fCNAME.totalSize();
00861 numBytes += 1;
00862
00863 unsigned num4ByteWords = (numBytes + 3)/4;
00864
00865 unsigned rtcpHdr = 0x81000000;
00866 rtcpHdr |= (RTCP_PT_SDES<<16);
00867 rtcpHdr |= num4ByteWords;
00868 fOutBuf->enqueueWord(rtcpHdr);
00869
00870 if (fSource != NULL) {
00871 fOutBuf->enqueueWord(fSource->SSRC());
00872 } else if (fSink != NULL) {
00873 fOutBuf->enqueueWord(fSink->SSRC());
00874 }
00875
00876
00877 fOutBuf->enqueue(fCNAME.data(), fCNAME.totalSize());
00878
00879
00880 unsigned numPaddingBytesNeeded = 4 - (fOutBuf->curPacketSize() % 4);
00881 unsigned char const zero = '\0';
00882 while (numPaddingBytesNeeded-- > 0) fOutBuf->enqueue(&zero, 1);
00883 }
00884
00885 void RTCPInstance::addBYE() {
00886 unsigned rtcpHdr = 0x81000000;
00887 rtcpHdr |= (RTCP_PT_BYE<<16);
00888 rtcpHdr |= 1;
00889 fOutBuf->enqueueWord(rtcpHdr);
00890
00891 if (fSource != NULL) {
00892 fOutBuf->enqueueWord(fSource->SSRC());
00893 } else if (fSink != NULL) {
00894 fOutBuf->enqueueWord(fSink->SSRC());
00895 }
00896 }
00897
00898 void RTCPInstance::schedule(double nextTime) {
00899 fNextReportTime = nextTime;
00900
00901 double secondsToDelay = nextTime - dTimeNow();
00902 if (secondsToDelay < 0) secondsToDelay = 0;
00903 #ifdef DEBUG
00904 fprintf(stderr, "schedule(%f->%f)\n", secondsToDelay, nextTime);
00905 #endif
00906 int64_t usToGo = (int64_t)(secondsToDelay * 1000000);
00907 nextTask() = envir().taskScheduler().scheduleDelayedTask(usToGo,
00908 (TaskFunc*)RTCPInstance::onExpire, this);
00909 }
00910
00911 void RTCPInstance::reschedule(double nextTime) {
00912 envir().taskScheduler().unscheduleDelayedTask(nextTask());
00913 schedule(nextTime);
00914 }
00915
00916 void RTCPInstance::onExpire1() {
00917
00918 double rtcpBW = 0.05*fTotSessionBW*1024/8;
00919
00920 OnExpire(this,
00921 numMembers(),
00922 (fSink != NULL) ? 1 : 0,
00923 rtcpBW,
00924 (fSink != NULL) ? 1 : 0,
00925 &fAveRTCPSize,
00926 &fIsInitial,
00927 dTimeNow(),
00928 &fPrevReportTime,
00929 &fPrevNumMembers
00930 );
00931 }
00932
00934
00935 SDESItem::SDESItem(unsigned char tag, unsigned char const* value) {
00936 unsigned length = strlen((char const*)value);
00937 if (length > 0xFF) length = 0xFF;
00938
00939 fData[0] = tag;
00940 fData[1] = (unsigned char)length;
00941 memmove(&fData[2], value, length);
00942 }
00943
00944 unsigned SDESItem::totalSize() const {
00945 return 2 + (unsigned)fData[1];
00946 }
00947
00948
00950
00951 extern "C" void Schedule(double nextTime, event e) {
00952 RTCPInstance* instance = (RTCPInstance*)e;
00953 if (instance == NULL) return;
00954
00955 instance->schedule(nextTime);
00956 }
00957
00958 extern "C" void Reschedule(double nextTime, event e) {
00959 RTCPInstance* instance = (RTCPInstance*)e;
00960 if (instance == NULL) return;
00961
00962 instance->reschedule(nextTime);
00963 }
00964
00965 extern "C" void SendRTCPReport(event e) {
00966 RTCPInstance* instance = (RTCPInstance*)e;
00967 if (instance == NULL) return;
00968
00969 instance->sendReport();
00970 }
00971
00972 extern "C" void SendBYEPacket(event e) {
00973 RTCPInstance* instance = (RTCPInstance*)e;
00974 if (instance == NULL) return;
00975
00976 instance->sendBYE();
00977 }
00978
00979 extern "C" int TypeOfEvent(event e) {
00980 RTCPInstance* instance = (RTCPInstance*)e;
00981 if (instance == NULL) return EVENT_UNKNOWN;
00982
00983 return instance->typeOfEvent();
00984 }
00985
00986 extern "C" int SentPacketSize(event e) {
00987 RTCPInstance* instance = (RTCPInstance*)e;
00988 if (instance == NULL) return 0;
00989
00990 return instance->sentPacketSize();
00991 }
00992
00993 extern "C" int PacketType(packet p) {
00994 RTCPInstance* instance = (RTCPInstance*)p;
00995 if (instance == NULL) return PACKET_UNKNOWN_TYPE;
00996
00997 return instance->packetType();
00998 }
00999
01000 extern "C" int ReceivedPacketSize(packet p) {
01001 RTCPInstance* instance = (RTCPInstance*)p;
01002 if (instance == NULL) return 0;
01003
01004 return instance->receivedPacketSize();
01005 }
01006
01007 extern "C" int NewMember(packet p) {
01008 RTCPInstance* instance = (RTCPInstance*)p;
01009 if (instance == NULL) return 0;
01010
01011 return instance->checkNewSSRC();
01012 }
01013
01014 extern "C" int NewSender(packet ) {
01015 return 0;
01016 }
01017
01018 extern "C" void AddMember(packet ) {
01019
01020 }
01021
01022 extern "C" void AddSender(packet ) {
01023
01024 }
01025
01026 extern "C" void RemoveMember(packet p) {
01027 RTCPInstance* instance = (RTCPInstance*)p;
01028 if (instance == NULL) return;
01029
01030 instance->removeLastReceivedSSRC();
01031 }
01032
01033 extern "C" void RemoveSender(packet ) {
01034
01035 }
01036
01037 extern "C" double drand30() {
01038 unsigned tmp = our_random()&0x3FFFFFFF;
01039 return tmp/(double)(1024*1024*1024);
01040 }