liveMedia/TCPStreamSink.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2013 Live Networks, Inc.  All rights reserved.
00018 // A sink representing a TCP output stream
00019 // Implementation
00020 
00021 #include "TCPStreamSink.hh"
00022 
00023 TCPStreamSink* TCPStreamSink::createNew(UsageEnvironment& env, int socketNum) {
00024   return new TCPStreamSink(env, socketNum);
00025 }
00026 
00027 TCPStreamSink::TCPStreamSink(UsageEnvironment& env, int socketNum)
00028   : MediaSink(env),
00029     fUnwrittenBytesStart(0), fUnwrittenBytesEnd(0),
00030     fInputSourceIsOpen(False), fOutputSocketIsWritable(True),
00031     fOutputSocketNum(socketNum) {
00032 }
00033 
00034 TCPStreamSink::~TCPStreamSink() {
00035 }
00036 
00037 Boolean TCPStreamSink::continuePlaying() {
00038   fInputSourceIsOpen = fSource != NULL;
00039   processBuffer();
00040 
00041   return True;
00042 }
00043 
00044 #define TCP_STREAM_SINK_MIN_READ_SIZE 1000
00045 
00046 void TCPStreamSink::processBuffer() {
00047   // First, try writing data to our output socket, if we can:
00048   if (fOutputSocketIsWritable && numUnwrittenBytes() > 0) {
00049     int numBytesWritten
00050       = send(fOutputSocketNum, (const char*)&fBuffer[fUnwrittenBytesStart], numUnwrittenBytes(), 0);
00051     if (numBytesWritten < (int)numUnwrittenBytes()) {
00052       // The output socket is no longer writable.  Set a handler to be called when it becomes writable again.
00053       fOutputSocketIsWritable = False;
00054       envir().taskScheduler().setBackgroundHandling(fOutputSocketNum, SOCKET_WRITABLE, socketWritableHandler, this);
00055     }
00056     if (numBytesWritten > 0) {
00057       // We wrote at least some of our data.  Update our buffer pointers:
00058       fUnwrittenBytesStart += numBytesWritten;
00059       if (fUnwrittenBytesStart > fUnwrittenBytesEnd) fUnwrittenBytesStart = fUnwrittenBytesEnd; // sanity check
00060       if (fUnwrittenBytesStart == fUnwrittenBytesEnd && (!fInputSourceIsOpen || !fSource->isCurrentlyAwaitingData())) {
00061         fUnwrittenBytesStart = fUnwrittenBytesEnd = 0; // reset the buffer to empty
00062       }
00063     }
00064   }
00065 
00066   // Then, read from our input source, if we can (& we're not already reading from it):
00067   if (fInputSourceIsOpen && freeBufferSpace() >= TCP_STREAM_SINK_MIN_READ_SIZE && !fSource->isCurrentlyAwaitingData()) {
00068     fSource->getNextFrame(&fBuffer[fUnwrittenBytesEnd], freeBufferSpace(), afterGettingFrame, this, ourOnSourceClosure, this);
00069   }
00070 
00071   if (!fInputSourceIsOpen && numUnwrittenBytes() == 0) {
00072     // We're now done:
00073     onSourceClosure(this);
00074   }
00075 }
00076 
00077 void TCPStreamSink::socketWritableHandler(void* clientData, int /*mask*/) {
00078   TCPStreamSink* sink = (TCPStreamSink*)clientData;
00079   sink->socketWritableHandler1();
00080 }
00081 
00082 void TCPStreamSink::socketWritableHandler1() {
00083   envir().taskScheduler().disableBackgroundHandling(fOutputSocketNum); // disable this handler until the next time it's needed
00084 
00085   fOutputSocketIsWritable = True;
00086   processBuffer();
00087 }
00088 
00089 void TCPStreamSink::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes,
00090                                 struct timeval /*presentationTime*/, unsigned /*durationInMicroseconds*/) {
00091   TCPStreamSink* sink = (TCPStreamSink*)clientData;
00092   sink->afterGettingFrame(frameSize, numTruncatedBytes);
00093 }
00094 
00095 void TCPStreamSink::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes) {
00096   if (numTruncatedBytes > 0) {
00097     envir() << "TCPStreamSink::afterGettingFrame(): The input frame data was too large for our buffer.  "
00098             << numTruncatedBytes
00099             << " bytes of trailing data was dropped!  Correct this by increasing the definition of \"TCP_STREAM_SINK_BUFFER_SIZE\" in \"include/TCPStreamSink.hh\".\n";
00100   }
00101   fUnwrittenBytesEnd += frameSize;
00102   processBuffer();
00103 }
00104 
00105 void TCPStreamSink::ourOnSourceClosure(void* clientData) {
00106   TCPStreamSink* sink = (TCPStreamSink*)clientData;
00107   sink->ourOnSourceClosure1();
00108 }
00109 
00110 void TCPStreamSink::ourOnSourceClosure1() {
00111   // The input source has closed:
00112   fInputSourceIsOpen = False;
00113   processBuffer();
00114 }

Generated on Thu May 30 08:13:01 2013 for live by  doxygen 1.5.2