00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "RTPInterface.hh"
00024 #include <GroupsockHelper.hh>
00025 #include <stdio.h>
00026
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) {
00038 _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent);
00039 if (ourTables == NULL) return NULL;
00040
00041 if (ourTables->socketTable == NULL) {
00042
00043 ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
00044 }
00045 return (HashTable*)(ourTables->socketTable);
00046 }
00047
00048 class SocketDescriptor {
00049 public:
00050 SocketDescriptor(UsageEnvironment& env, int socketNum);
00051 virtual ~SocketDescriptor();
00052
00053 void registerRTPInterface(unsigned char streamChannelId,
00054 RTPInterface* rtpInterface);
00055 RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
00056 void deregisterRTPInterface(unsigned char streamChannelId);
00057
00058 void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) {
00059 fServerRequestAlternativeByteHandler = handler;
00060 fServerRequestAlternativeByteHandlerClientData = clientData;
00061 }
00062
00063 private:
00064 static void tcpReadHandler(SocketDescriptor*, int mask);
00065 Boolean tcpReadHandler1(int mask);
00066
00067 private:
00068 UsageEnvironment& fEnv;
00069 int fOurSocketNum;
00070 HashTable* fSubChannelHashTable;
00071 ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler;
00072 void* fServerRequestAlternativeByteHandlerClientData;
00073 u_int8_t fStreamChannelId, fSizeByte1;
00074 Boolean fReadErrorOccurred, fDeleteNext;
00075 enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState;
00076 };
00077
00078 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) {
00079 HashTable* table = socketHashTable(env, createIfNotFound);
00080 if (table == NULL) return NULL;
00081
00082 char const* key = (char const*)(long)sockNum;
00083 SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key));
00084 if (socketDescriptor == NULL) {
00085 if (createIfNotFound) {
00086 socketDescriptor = new SocketDescriptor(env, sockNum);
00087 table->Add((char const*)(long)(sockNum), socketDescriptor);
00088 } else if (table->IsEmpty()) {
00089
00090 _Tables* ourTables = _Tables::getOurTables(env);
00091 delete table;
00092 ourTables->socketTable = NULL;
00093 ourTables->reclaimIfPossible();
00094 }
00095 }
00096
00097 return socketDescriptor;
00098 }
00099
00100 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
00101 char const* key = (char const*)(long)sockNum;
00102 HashTable* table = socketHashTable(env);
00103 table->Remove(key);
00104
00105 if (table->IsEmpty()) {
00106
00107 _Tables* ourTables = _Tables::getOurTables(env);
00108 delete table;
00109 ourTables->socketTable = NULL;
00110 ourTables->reclaimIfPossible();
00111 }
00112 }
00113
00114
00116
00117 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
00118 : fOwner(owner), fGS(gs),
00119 fTCPStreams(NULL),
00120 fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
00121 fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
00122 fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
00123
00124
00125
00126
00127 makeSocketNonBlocking(fGS->socketNum());
00128 increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
00129 }
00130
00131 RTPInterface::~RTPInterface() {
00132 delete fTCPStreams;
00133 }
00134
00135 void RTPInterface::setStreamSocket(int sockNum,
00136 unsigned char streamChannelId) {
00137 fGS->removeAllDestinations();
00138 addStreamSocket(sockNum, streamChannelId);
00139 }
00140
00141 void RTPInterface::addStreamSocket(int sockNum,
00142 unsigned char streamChannelId) {
00143 if (sockNum < 0) return;
00144
00145 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00146 streams = streams->fNext) {
00147 if (streams->fStreamSocketNum == sockNum
00148 && streams->fStreamChannelId == streamChannelId) {
00149 return;
00150 }
00151 }
00152
00153 fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
00154
00155
00156 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), sockNum);
00157 socketDescriptor->registerRTPInterface(streamChannelId, this);
00158 }
00159
00160 static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) {
00161 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False);
00162 if (socketDescriptor != NULL) {
00163 socketDescriptor->deregisterRTPInterface(streamChannelId);
00164
00165
00166 }
00167 }
00168
00169 void RTPInterface::removeStreamSocket(int sockNum,
00170 unsigned char streamChannelId) {
00171 for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;
00172 streamsPtr = &((*streamsPtr)->fNext)) {
00173 if ((*streamsPtr)->fStreamSocketNum == sockNum
00174 && (*streamsPtr)->fStreamChannelId == streamChannelId) {
00175 deregisterSocket(envir(), sockNum, streamChannelId);
00176
00177
00178 tcpStreamRecord* next = (*streamsPtr)->fNext;
00179 (*streamsPtr)->fNext = NULL;
00180 delete (*streamsPtr);
00181 *streamsPtr = next;
00182 return;
00183 }
00184 }
00185 }
00186
00187 void RTPInterface
00188 ::setServerRequestAlternativeByteHandler(int socketNum, ServerRequestAlternativeByteHandler* handler, void* clientData) {
00189 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), socketNum);
00190
00191 if (socketDescriptor != NULL) socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData);
00192 }
00193
00194
00195 Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
00196 Boolean success = True;
00197
00198
00199 if (!fGS->output(envir(), fGS->ttl(), packet, packetSize)) success = False;
00200
00201
00202 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00203 streams = streams->fNext) {
00204 if (!sendRTPorRTCPPacketOverTCP(packet, packetSize,
00205 streams->fStreamSocketNum, streams->fStreamChannelId)) {
00206 success = False;
00207 }
00208 }
00209
00210 return success;
00211 }
00212
00213 void RTPInterface
00214 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
00215
00216 envir().taskScheduler().
00217 turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
00218
00219
00220 fReadHandlerProc = handlerProc;
00221 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00222 streams = streams->fNext) {
00223
00224 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00225
00226
00227 socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
00228 }
00229 }
00230
00231 Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
00232 unsigned& bytesRead, struct sockaddr_in& fromAddress, Boolean& packetReadWasIncomplete) {
00233 packetReadWasIncomplete = False;
00234 Boolean readSuccess;
00235 if (fNextTCPReadStreamSocketNum < 0) {
00236
00237 readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
00238 } else {
00239
00240 bytesRead = 0;
00241 unsigned totBytesToRead = fNextTCPReadSize;
00242 if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
00243 unsigned curBytesToRead = totBytesToRead;
00244 int curBytesRead;
00245 while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
00246 &buffer[bytesRead], curBytesToRead,
00247 fromAddress)) > 0) {
00248 bytesRead += curBytesRead;
00249 if (bytesRead >= totBytesToRead) break;
00250 curBytesToRead -= curBytesRead;
00251 }
00252 fNextTCPReadSize -= bytesRead;
00253 if (fNextTCPReadSize == 0) {
00254
00255 readSuccess = True;
00256 } else if (curBytesRead < 0) {
00257
00258 bytesRead = 0;
00259 readSuccess = False;
00260 } else {
00261
00262 packetReadWasIncomplete = True;
00263 return True;
00264 }
00265 fNextTCPReadStreamSocketNum = -1;
00266 }
00267
00268 if (readSuccess && fAuxReadHandlerFunc != NULL) {
00269
00270 (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
00271 }
00272 return readSuccess;
00273 }
00274
00275 void RTPInterface::stopNetworkReading() {
00276
00277 envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
00278
00279
00280 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00281 streams = streams->fNext) {
00282 deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId);
00283 }
00284 }
00285
00286
00288
00289 Boolean RTPInterface::sendRTPorRTCPPacketOverTCP(u_int8_t* packet, unsigned packetSize,
00290 int socketNum, unsigned char streamChannelId) {
00291 #ifdef DEBUG_SEND
00292 fprintf(stderr, "sendRTPorRTCPPacketOverTCP: %d bytes over channel %d (socket %d)\n",
00293 packetSize, streamChannelId, socketNum); fflush(stderr);
00294 #endif
00295
00296
00297
00298
00299 do {
00300 u_int8_t framingHeader[4];
00301 framingHeader[0] = '$';
00302 framingHeader[1] = streamChannelId;
00303 framingHeader[2] = (u_int8_t) ((packetSize&0xFF00)>>8);
00304 framingHeader[3] = (u_int8_t) (packetSize&0xFF);
00305 if (!sendDataOverTCP(socketNum, framingHeader, 4, False)) break;
00306
00307 if (!sendDataOverTCP(socketNum, packet, packetSize, True)) break;
00308 #ifdef DEBUG_SEND
00309 fprintf(stderr, "sendRTPorRTCPPacketOverTCP: completed\n"); fflush(stderr);
00310 #endif
00311
00312 return True;
00313 } while (0);
00314
00315 #ifdef DEBUG_SEND
00316 fprintf(stderr, "sendRTPorRTCPPacketOverTCP: failed! (errno %d)\n", envir().getErrno()); fflush(stderr);
00317 #endif
00318 return False;
00319 }
00320
00321 Boolean RTPInterface::sendDataOverTCP(int socketNum, u_int8_t const* data, unsigned dataSize, Boolean forceSendToSucceed) {
00322 if (send(socketNum, (char const*)data, dataSize, 0) != (int)dataSize) {
00323
00324
00325 if (forceSendToSucceed && envir().getErrno() == EAGAIN) {
00326
00327
00328 #ifdef DEBUG_SEND
00329 fprintf(stderr, "sendDataOverTCP: resending %d-byte send (blocking)\n", dataSize); fflush(stderr);
00330 #endif
00331 makeSocketBlocking(socketNum);
00332 Boolean sendSuccess = send(socketNum, (char const*)data, dataSize, 0) == (int)dataSize;
00333 makeSocketNonBlocking(socketNum);
00334 return sendSuccess;
00335 }
00336 return False;
00337 }
00338
00339 return True;
00340 }
00341
00342 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
00343 :fEnv(env), fOurSocketNum(socketNum),
00344 fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)),
00345 fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL),
00346 fReadErrorOccurred(False), fDeleteNext(False), fTCPReadingState(AWAITING_DOLLAR) {
00347 }
00348
00349 SocketDescriptor::~SocketDescriptor() {
00350 fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00351 if (fServerRequestAlternativeByteHandler != NULL) {
00352
00353
00354
00355 u_int8_t specialChar = fReadErrorOccurred ? 0xFF : 0xFE;
00356 (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, specialChar);
00357 }
00358 removeSocketDescription(fEnv, fOurSocketNum);
00359
00360 if (fSubChannelHashTable != NULL) {
00361 while (fSubChannelHashTable->RemoveNext() != NULL) {}
00362 delete fSubChannelHashTable;
00363 }
00364 }
00365
00366 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
00367 RTPInterface* rtpInterface) {
00368 Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
00369 #if defined(DEBUG_SEND)||defined(DEBUG_RECEIVE)
00370 fprintf(stderr, "SocketDescriptor(socket %d)::registerRTPInterface(channel %d): isFirstRegistration %d\n", fOurSocketNum, streamChannelId, isFirstRegistration);
00371 #endif
00372 fSubChannelHashTable->Add((char const*)(long)streamChannelId,
00373 rtpInterface);
00374
00375 if (isFirstRegistration) {
00376
00377 TaskScheduler::BackgroundHandlerProc* handler
00378 = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
00379 fEnv.taskScheduler().
00380 setBackgroundHandling(fOurSocketNum, SOCKET_READABLE|SOCKET_EXCEPTION, handler, this);
00381 }
00382 }
00383
00384 RTPInterface* SocketDescriptor
00385 ::lookupRTPInterface(unsigned char streamChannelId) {
00386 char const* lookupArg = (char const*)(long)streamChannelId;
00387 return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
00388 }
00389
00390 void SocketDescriptor
00391 ::deregisterRTPInterface(unsigned char streamChannelId) {
00392 #if defined(DEBUG_SEND)||defined(DEBUG_RECEIVE)
00393 fprintf(stderr, "SocketDescriptor(socket %d)::deregisterRTPInterface(channel %d)\n", fOurSocketNum, streamChannelId);
00394 #endif
00395 fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
00396
00397 if (fSubChannelHashTable->IsEmpty()) {
00398
00399 fDeleteNext = True;
00400 }
00401 }
00402
00403 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) {
00404
00405 unsigned count = 2000;
00406 while (!socketDescriptor->fDeleteNext && socketDescriptor->tcpReadHandler1(mask) && --count > 0) {}
00407 if (socketDescriptor->fDeleteNext) delete socketDescriptor;
00408 }
00409
00410 Boolean SocketDescriptor::tcpReadHandler1(int mask) {
00411
00412
00413
00414
00415
00416
00417
00418
00419 u_int8_t c;
00420 struct sockaddr_in fromAddress;
00421 if (fTCPReadingState != AWAITING_PACKET_DATA) {
00422 int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
00423 if (result == 0) {
00424 return False;
00425 } else if (result != 1) {
00426 #ifdef DEBUG_RECEIVE
00427 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): readSocket(1 byte) returned %d (error)\n", fOurSocketNum, result);
00428 #endif
00429 fReadErrorOccurred = True;
00430 delete this;
00431 return False;
00432 }
00433 }
00434
00435 Boolean callAgain = True;
00436 switch (fTCPReadingState) {
00437 case AWAITING_DOLLAR: {
00438 if (c == '$') {
00439 #ifdef DEBUG_RECEIVE
00440 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): Saw '$'\n", fOurSocketNum);
00441 #endif
00442 fTCPReadingState = AWAITING_STREAM_CHANNEL_ID;
00443 } else {
00444
00445 if (fServerRequestAlternativeByteHandler != NULL && c != 0xFF && c != 0xFE) {
00446
00447 (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c);
00448 }
00449 }
00450 break;
00451 }
00452 case AWAITING_STREAM_CHANNEL_ID: {
00453
00454 if (lookupRTPInterface(c) != NULL) {
00455 fStreamChannelId = c;
00456 fTCPReadingState = AWAITING_SIZE1;
00457 } else {
00458
00459 #ifdef DEBUG_RECEIVE
00460 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): Saw nonexistent stream channel id: 0x%02x\n", fOurSocketNum, c);
00461 #endif
00462 fTCPReadingState = AWAITING_DOLLAR;
00463 }
00464 break;
00465 }
00466 case AWAITING_SIZE1: {
00467
00468 fSizeByte1 = c;
00469 fTCPReadingState = AWAITING_SIZE2;
00470 break;
00471 }
00472 case AWAITING_SIZE2: {
00473
00474 unsigned short size = (fSizeByte1<<8)|c;
00475
00476
00477 RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00478 if (rtpInterface != NULL) {
00479 rtpInterface->fNextTCPReadSize = size;
00480 rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum;
00481 rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId;
00482 }
00483 fTCPReadingState = AWAITING_PACKET_DATA;
00484 break;
00485 }
00486 case AWAITING_PACKET_DATA: {
00487 callAgain = False;
00488 fTCPReadingState = AWAITING_DOLLAR;
00489
00490 RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00491 if (rtpInterface != NULL) {
00492 if (rtpInterface->fNextTCPReadSize == 0) {
00493
00494 break;
00495 }
00496 if (rtpInterface->fReadHandlerProc != NULL) {
00497 #ifdef DEBUG_RECEIVE
00498 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): reading %d bytes on channel %d\n", fOurSocketNum, rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId);
00499 #endif
00500 fTCPReadingState = AWAITING_PACKET_DATA;
00501 rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
00502 } else {
00503 #ifdef DEBUG_RECEIVE
00504 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): No handler proc for \"rtpInterface\" for channel %d; need to skip %d remaining bytes\n", fOurSocketNum, fStreamChannelId, rtpInterface->fNextTCPReadSize);
00505 #endif
00506 int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
00507 if (result < 0) {
00508 #ifdef DEBUG_RECEIVE
00509 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): readSocket(1 byte) returned %d (error)\n", fOurSocketNum, result);
00510 #endif
00511 fReadErrorOccurred = True;
00512 delete this;
00513 return False;
00514 } else {
00515 fTCPReadingState = AWAITING_PACKET_DATA;
00516 if (result == 1) {
00517 --rtpInterface->fNextTCPReadSize;
00518 callAgain = True;
00519 }
00520 }
00521 }
00522 }
00523 #ifdef DEBUG_RECEIVE
00524 else fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): No \"rtpInterface\" for channel %d\n", fOurSocketNum, fStreamChannelId);
00525 #endif
00526 }
00527 }
00528
00529 return callAgain;
00530 }
00531
00532
00534
00535 tcpStreamRecord
00536 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
00537 tcpStreamRecord* next)
00538 : fNext(next),
00539 fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
00540 }
00541
00542 tcpStreamRecord::~tcpStreamRecord() {
00543 delete fNext;
00544 }