liveMedia/RTPInterface.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2013 Live Networks, Inc.  All rights reserved.
00018 // An abstraction of a network interface used for RTP (or RTCP).
00019 // (This allows the RTP-over-TCP hack (RFC 2326, section 10.12) to
00020 // be implemented transparently.)
00021 // Implementation
00022 
00023 #include "RTPInterface.hh"
00024 #include <GroupsockHelper.hh>
00025 #include <stdio.h>
00026 
00028 
00029 // Helper routines and data structures, used to implement
00030 // sending/receiving RTP/RTCP over a TCP socket:
00031 
00032 // Reading RTP-over-TCP is implemented using two levels of hash tables.
00033 // The top-level hash table maps TCP socket numbers to a
00034 // "SocketDescriptor" that contains a hash table for each of the
00035 // sub-channels that are reading from this socket.
00036 
00037 static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) {
00038   _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent);
00039   if (ourTables == NULL) return NULL;
00040 
00041   if (ourTables->socketTable == NULL) {
00042     // Create a new socket number -> SocketDescriptor mapping table:
00043     ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
00044   }
00045   return (HashTable*)(ourTables->socketTable);
00046 }
00047 
00048 class SocketDescriptor {
00049 public:
00050   SocketDescriptor(UsageEnvironment& env, int socketNum);
00051   virtual ~SocketDescriptor();
00052 
00053   void registerRTPInterface(unsigned char streamChannelId,
00054                             RTPInterface* rtpInterface);
00055   RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
00056   void deregisterRTPInterface(unsigned char streamChannelId);
00057 
00058   void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) {
00059     fServerRequestAlternativeByteHandler = handler;
00060     fServerRequestAlternativeByteHandlerClientData = clientData;
00061   }
00062 
00063 private:
00064   static void tcpReadHandler(SocketDescriptor*, int mask);
00065   Boolean tcpReadHandler1(int mask);
00066 
00067 private:
00068   UsageEnvironment& fEnv;
00069   int fOurSocketNum;
00070   HashTable* fSubChannelHashTable;
00071   ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler;
00072   void* fServerRequestAlternativeByteHandlerClientData;
00073   u_int8_t fStreamChannelId, fSizeByte1;
00074   Boolean fReadErrorOccurred, fDeleteNext;
00075   enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState;
00076 };
00077 
00078 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) {
00079   HashTable* table = socketHashTable(env, createIfNotFound);
00080   if (table == NULL) return NULL;
00081 
00082   char const* key = (char const*)(long)sockNum;
00083   SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key));
00084   if (socketDescriptor == NULL) {
00085     if (createIfNotFound) {
00086       socketDescriptor = new SocketDescriptor(env, sockNum);
00087       table->Add((char const*)(long)(sockNum), socketDescriptor);
00088     } else if (table->IsEmpty()) {
00089       // We can also delete the table (to reclaim space):
00090       _Tables* ourTables = _Tables::getOurTables(env);
00091       delete table;
00092       ourTables->socketTable = NULL;
00093       ourTables->reclaimIfPossible();
00094     }
00095   }
00096 
00097   return socketDescriptor;
00098 }
00099 
00100 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
00101   char const* key = (char const*)(long)sockNum;
00102   HashTable* table = socketHashTable(env);
00103   table->Remove(key);
00104 
00105   if (table->IsEmpty()) {
00106     // We can also delete the table (to reclaim space):
00107     _Tables* ourTables = _Tables::getOurTables(env);
00108     delete table;
00109     ourTables->socketTable = NULL;
00110     ourTables->reclaimIfPossible();
00111   }
00112 }
00113 
00114 
00116 
00117 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
00118   : fOwner(owner), fGS(gs),
00119     fTCPStreams(NULL),
00120     fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
00121     fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
00122     fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
00123   // Make the socket non-blocking, even though it will be read from only asynchronously, when packets arrive.
00124   // The reason for this is that, in some OSs, reads on a blocking socket can (allegedly) sometimes block,
00125   // even if the socket was previously reported (e.g., by "select()") as having data available.
00126   // (This can supposedly happen if the UDP checksum fails, for example.)
00127   makeSocketNonBlocking(fGS->socketNum());
00128   increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
00129 }
00130 
00131 RTPInterface::~RTPInterface() {
00132   delete fTCPStreams;
00133 }
00134 
00135 void RTPInterface::setStreamSocket(int sockNum,
00136                                    unsigned char streamChannelId) {
00137   fGS->removeAllDestinations();
00138   addStreamSocket(sockNum, streamChannelId);
00139 }
00140 
00141 void RTPInterface::addStreamSocket(int sockNum,
00142                                    unsigned char streamChannelId) {
00143   if (sockNum < 0) return;
00144 
00145   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00146        streams = streams->fNext) {
00147     if (streams->fStreamSocketNum == sockNum
00148         && streams->fStreamChannelId == streamChannelId) {
00149       return; // we already have it
00150     }
00151   }
00152 
00153   fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
00154 
00155   // Also, make sure this new socket is set up for receiving RTP/RTCP-over-TCP:
00156   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), sockNum);
00157   socketDescriptor->registerRTPInterface(streamChannelId, this);
00158 }
00159 
00160 static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) {
00161   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False);
00162   if (socketDescriptor != NULL) {
00163     socketDescriptor->deregisterRTPInterface(streamChannelId);
00164         // Note: This may delete "socketDescriptor",
00165         // if no more interfaces are using this socket
00166   }
00167 }
00168 
00169 void RTPInterface::removeStreamSocket(int sockNum,
00170                                       unsigned char streamChannelId) {
00171   for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;
00172        streamsPtr = &((*streamsPtr)->fNext)) {
00173     if ((*streamsPtr)->fStreamSocketNum == sockNum
00174         && (*streamsPtr)->fStreamChannelId == streamChannelId) {
00175       deregisterSocket(envir(), sockNum, streamChannelId);
00176 
00177       // Then remove the record pointed to by *streamsPtr :
00178       tcpStreamRecord* next = (*streamsPtr)->fNext;
00179       (*streamsPtr)->fNext = NULL;
00180       delete (*streamsPtr);
00181       *streamsPtr = next;
00182       return;
00183     }
00184   }
00185 }
00186 
00187 void RTPInterface
00188 ::setServerRequestAlternativeByteHandler(int socketNum, ServerRequestAlternativeByteHandler* handler, void* clientData) {
00189   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), socketNum);
00190 
00191   if (socketDescriptor != NULL) socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData);
00192 }
00193 
00194 
00195 Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
00196   Boolean success = True; // we'll return False instead if any of the sends fail
00197 
00198   // Normal case: Send as a UDP packet:
00199   if (!fGS->output(envir(), fGS->ttl(), packet, packetSize)) success = False;
00200 
00201   // Also, send over each of our TCP sockets:
00202   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00203        streams = streams->fNext) {
00204     if (!sendRTPorRTCPPacketOverTCP(packet, packetSize,
00205                                     streams->fStreamSocketNum, streams->fStreamChannelId)) {
00206       success = False;
00207     }
00208   }
00209 
00210   return success;
00211 }
00212 
00213 void RTPInterface
00214 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
00215   // Normal case: Arrange to read UDP packets:
00216   envir().taskScheduler().
00217     turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
00218 
00219   // Also, receive RTP over TCP, on each of our TCP connections:
00220   fReadHandlerProc = handlerProc;
00221   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00222        streams = streams->fNext) {
00223     // Get a socket descriptor for "streams->fStreamSocketNum":
00224     SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
00225 
00226     // Tell it about our subChannel:
00227     socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
00228   }
00229 }
00230 
00231 Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
00232                                  unsigned& bytesRead, struct sockaddr_in& fromAddress, Boolean& packetReadWasIncomplete) {
00233   packetReadWasIncomplete = False; // by default
00234   Boolean readSuccess;
00235   if (fNextTCPReadStreamSocketNum < 0) {
00236     // Normal case: read from the (datagram) 'groupsock':
00237     readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
00238   } else {
00239     // Read from the TCP connection:
00240     bytesRead = 0;
00241     unsigned totBytesToRead = fNextTCPReadSize;
00242     if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
00243     unsigned curBytesToRead = totBytesToRead;
00244     int curBytesRead;
00245     while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
00246                                       &buffer[bytesRead], curBytesToRead,
00247                                       fromAddress)) > 0) {
00248       bytesRead += curBytesRead;
00249       if (bytesRead >= totBytesToRead) break;
00250       curBytesToRead -= curBytesRead;
00251     }
00252     fNextTCPReadSize -= bytesRead;
00253     if (fNextTCPReadSize == 0) {
00254       // We've read all of the data that we asked for
00255       readSuccess = True;
00256     } else if (curBytesRead < 0) {
00257       // There was an error reading the socket
00258       bytesRead = 0;
00259       readSuccess = False;
00260     } else {
00261       // We need to read more bytes, and there was not an error reading the socket
00262       packetReadWasIncomplete = True;
00263       return True;
00264     }
00265     fNextTCPReadStreamSocketNum = -1; // default, for next time
00266   }
00267 
00268   if (readSuccess && fAuxReadHandlerFunc != NULL) {
00269     // Also pass the newly-read packet data to our auxilliary handler:
00270     (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
00271   }
00272   return readSuccess;
00273 }
00274 
00275 void RTPInterface::stopNetworkReading() {
00276   // Normal case
00277   envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
00278 
00279   // Also turn off read handling on each of our TCP connections:
00280   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
00281        streams = streams->fNext) {
00282     deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId);
00283   }
00284 }
00285 
00286 
00288 
00289 Boolean RTPInterface::sendRTPorRTCPPacketOverTCP(u_int8_t* packet, unsigned packetSize,
00290                                                  int socketNum, unsigned char streamChannelId) {
00291 #ifdef DEBUG_SEND
00292   fprintf(stderr, "sendRTPorRTCPPacketOverTCP: %d bytes over channel %d (socket %d)\n",
00293           packetSize, streamChannelId, socketNum); fflush(stderr);
00294 #endif
00295   // Send a RTP/RTCP packet over TCP, using the encoding defined in RFC 2326, section 10.12:
00296   //     $<streamChannelId><packetSize><packet>
00297   // (If the initial "send()" of '$<streamChannelId><packetSize>' succeeds, then we force the subsequent "send()" for
00298   //  the <packet> data to succeed, even if we have to do so with a blocking "send()".)
00299   do {
00300     u_int8_t framingHeader[4];
00301     framingHeader[0] = '$';
00302     framingHeader[1] = streamChannelId;
00303     framingHeader[2] = (u_int8_t) ((packetSize&0xFF00)>>8);
00304     framingHeader[3] = (u_int8_t) (packetSize&0xFF);
00305     if (!sendDataOverTCP(socketNum, framingHeader, 4, False)) break;
00306 
00307     if (!sendDataOverTCP(socketNum, packet, packetSize, True)) break;
00308 #ifdef DEBUG_SEND
00309     fprintf(stderr, "sendRTPorRTCPPacketOverTCP: completed\n"); fflush(stderr);
00310 #endif
00311 
00312     return True;
00313   } while (0);
00314 
00315 #ifdef DEBUG_SEND
00316   fprintf(stderr, "sendRTPorRTCPPacketOverTCP: failed! (errno %d)\n", envir().getErrno()); fflush(stderr);
00317 #endif
00318   return False;
00319 }
00320 
00321 Boolean RTPInterface::sendDataOverTCP(int socketNum, u_int8_t const* data, unsigned dataSize, Boolean forceSendToSucceed) {
00322   if (send(socketNum, (char const*)data, dataSize, 0/*flags*/) != (int)dataSize) {
00323     // The TCP send() failed.
00324 
00325     if (forceSendToSucceed && envir().getErrno() == EAGAIN) {
00326       // The OS's TCP send buffer has filled up (because the stream's bitrate has exceeded the capacity of the TCP connection!).
00327       // Force this data write to succeed, by blocking if necessary until it does:
00328 #ifdef DEBUG_SEND
00329       fprintf(stderr, "sendDataOverTCP: resending %d-byte send (blocking)\n", dataSize); fflush(stderr);
00330 #endif
00331       makeSocketBlocking(socketNum);
00332       Boolean sendSuccess = send(socketNum, (char const*)data, dataSize, 0/*flags*/) == (int)dataSize;
00333       makeSocketNonBlocking(socketNum);
00334       return sendSuccess;
00335     }
00336     return False;
00337   }
00338 
00339   return True;
00340 }
00341 
00342 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
00343   :fEnv(env), fOurSocketNum(socketNum),
00344     fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)),
00345    fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL),
00346    fReadErrorOccurred(False), fDeleteNext(False), fTCPReadingState(AWAITING_DOLLAR) {
00347 }
00348 
00349 SocketDescriptor::~SocketDescriptor() {
00350   fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
00351   if (fServerRequestAlternativeByteHandler != NULL) {
00352     // Hack: Pass a special character to our alternative byte handler, to tell it that either
00353     // - an error occurred when reading the TCP socket, or
00354     // - no error occurred, but it needs to take over control of the TCP socket once again.
00355     u_int8_t specialChar = fReadErrorOccurred ? 0xFF : 0xFE;
00356     (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, specialChar);
00357   }
00358   removeSocketDescription(fEnv, fOurSocketNum);
00359 
00360   if (fSubChannelHashTable != NULL) {
00361     while (fSubChannelHashTable->RemoveNext() != NULL) {} // remove the "RTPInterface"s from the table, but don't delete them
00362     delete fSubChannelHashTable;
00363   }
00364 }
00365 
00366 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
00367                                             RTPInterface* rtpInterface) {
00368   Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
00369 #if defined(DEBUG_SEND)||defined(DEBUG_RECEIVE)
00370   fprintf(stderr, "SocketDescriptor(socket %d)::registerRTPInterface(channel %d): isFirstRegistration %d\n", fOurSocketNum, streamChannelId, isFirstRegistration);
00371 #endif
00372   fSubChannelHashTable->Add((char const*)(long)streamChannelId,
00373                             rtpInterface);
00374 
00375   if (isFirstRegistration) {
00376     // Arrange to handle reads on this TCP socket:
00377     TaskScheduler::BackgroundHandlerProc* handler
00378       = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
00379     fEnv.taskScheduler().
00380       setBackgroundHandling(fOurSocketNum, SOCKET_READABLE|SOCKET_EXCEPTION, handler, this);
00381   }
00382 }
00383 
00384 RTPInterface* SocketDescriptor
00385 ::lookupRTPInterface(unsigned char streamChannelId) {
00386   char const* lookupArg = (char const*)(long)streamChannelId;
00387   return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
00388 }
00389 
00390 void SocketDescriptor
00391 ::deregisterRTPInterface(unsigned char streamChannelId) {
00392 #if defined(DEBUG_SEND)||defined(DEBUG_RECEIVE)
00393   fprintf(stderr, "SocketDescriptor(socket %d)::deregisterRTPInterface(channel %d)\n", fOurSocketNum, streamChannelId);
00394 #endif
00395   fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
00396 
00397   if (fSubChannelHashTable->IsEmpty()) {
00398     // No more interfaces are using us, so it's curtains for us now:
00399     fDeleteNext = True; // hack to cause ourself to be deleted from "tcpReadHandler()" below
00400   }
00401 }
00402 
00403 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) {
00404   // Call the read handler until it returns false, with a limit to avoid starving other sockets
00405   unsigned count = 2000;
00406   while (!socketDescriptor->fDeleteNext && socketDescriptor->tcpReadHandler1(mask) && --count > 0) {}
00407   if (socketDescriptor->fDeleteNext) delete socketDescriptor;
00408 }
00409 
00410 Boolean SocketDescriptor::tcpReadHandler1(int mask) {
00411   // We expect the following data over the TCP channel:
00412   //   optional RTSP command or response bytes (before the first '$' character)
00413   //   a '$' character
00414   //   a 1-byte channel id
00415   //   a 2-byte packet size (in network byte order)
00416   //   the packet data.
00417   // However, because the socket is being read asynchronously, this data might arrive in pieces.
00418   
00419   u_int8_t c;
00420   struct sockaddr_in fromAddress;
00421   if (fTCPReadingState != AWAITING_PACKET_DATA) {
00422     int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
00423     if (result == 0) { // There was no more data to read
00424       return False;
00425     } else if (result != 1) { // error reading TCP socket, so we will no longer handle it
00426 #ifdef DEBUG_RECEIVE
00427       fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): readSocket(1 byte) returned %d (error)\n", fOurSocketNum, result);
00428 #endif
00429       fReadErrorOccurred = True;
00430       delete this;
00431       return False;
00432     }
00433   }
00434 
00435   Boolean callAgain = True;
00436   switch (fTCPReadingState) {
00437     case AWAITING_DOLLAR: {
00438       if (c == '$') {
00439 #ifdef DEBUG_RECEIVE
00440         fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): Saw '$'\n", fOurSocketNum);
00441 #endif
00442         fTCPReadingState = AWAITING_STREAM_CHANNEL_ID;
00443       } else {
00444         // This character is part of a RTSP request or command, which is handled separately:
00445         if (fServerRequestAlternativeByteHandler != NULL && c != 0xFF && c != 0xFE) {
00446           // Hack: 0xFF and 0xFE are used as special signaling characters, so don't send them
00447           (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c);
00448         }
00449       }
00450       break;
00451     }
00452     case AWAITING_STREAM_CHANNEL_ID: {
00453       // The byte that we read is the stream channel id.
00454       if (lookupRTPInterface(c) != NULL) { // sanity check
00455         fStreamChannelId = c;
00456         fTCPReadingState = AWAITING_SIZE1;
00457       } else {
00458         // This wasn't a stream channel id that we expected.  We're (somehow) in a strange state.  Try to recover:
00459 #ifdef DEBUG_RECEIVE
00460         fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): Saw nonexistent stream channel id: 0x%02x\n", fOurSocketNum, c);
00461 #endif
00462         fTCPReadingState = AWAITING_DOLLAR;
00463       }
00464       break;
00465     }
00466     case AWAITING_SIZE1: {
00467       // The byte that we read is the first (high) byte of the 16-bit RTP or RTCP packet 'size'.
00468       fSizeByte1 = c;
00469       fTCPReadingState = AWAITING_SIZE2;
00470       break;
00471     }
00472     case AWAITING_SIZE2: {
00473       // The byte that we read is the second (low) byte of the 16-bit RTP or RTCP packet 'size'.
00474       unsigned short size = (fSizeByte1<<8)|c;
00475       
00476       // Record the information about the packet data that will be read next:
00477       RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00478       if (rtpInterface != NULL) {
00479         rtpInterface->fNextTCPReadSize = size;
00480         rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum;
00481         rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId;
00482       }
00483       fTCPReadingState = AWAITING_PACKET_DATA;
00484       break;
00485     }
00486     case AWAITING_PACKET_DATA: {
00487       callAgain = False;
00488       fTCPReadingState = AWAITING_DOLLAR; // the next state, unless we end up having to read more data in the current state
00489       // Call the appropriate read handler to get the packet data from the TCP stream:
00490       RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
00491       if (rtpInterface != NULL) {
00492         if (rtpInterface->fNextTCPReadSize == 0) {
00493           // We've already read all the data for this packet.
00494           break;
00495         }
00496         if (rtpInterface->fReadHandlerProc != NULL) {
00497 #ifdef DEBUG_RECEIVE
00498           fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): reading %d bytes on channel %d\n", fOurSocketNum, rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId);
00499 #endif
00500           fTCPReadingState = AWAITING_PACKET_DATA;
00501           rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
00502         } else {
00503 #ifdef DEBUG_RECEIVE
00504           fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): No handler proc for \"rtpInterface\" for channel %d; need to skip %d remaining bytes\n", fOurSocketNum, fStreamChannelId, rtpInterface->fNextTCPReadSize);
00505 #endif
00506           int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
00507           if (result < 0) { // error reading TCP socket, so we will no longer handle it
00508 #ifdef DEBUG_RECEIVE
00509             fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): readSocket(1 byte) returned %d (error)\n", fOurSocketNum, result);
00510 #endif
00511             fReadErrorOccurred = True;
00512             delete this;
00513             return False;
00514           } else {
00515             fTCPReadingState = AWAITING_PACKET_DATA;
00516             if (result == 1) {
00517               --rtpInterface->fNextTCPReadSize;
00518               callAgain = True;
00519             }
00520           }
00521         }
00522       }
00523 #ifdef DEBUG_RECEIVE
00524       else fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): No \"rtpInterface\" for channel %d\n", fOurSocketNum, fStreamChannelId);
00525 #endif
00526     }
00527   }
00528 
00529   return callAgain;
00530 }
00531 
00532 
00534 
00535 tcpStreamRecord
00536 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
00537                   tcpStreamRecord* next)
00538   : fNext(next),
00539     fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
00540 }
00541 
00542 tcpStreamRecord::~tcpStreamRecord() {
00543   delete fNext;
00544 }

Generated on Mon Apr 29 13:28:03 2013 for live by  doxygen 1.5.2