00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "TCPStreamSink.hh"
00022
00023 TCPStreamSink* TCPStreamSink::createNew(UsageEnvironment& env, int socketNum) {
00024 return new TCPStreamSink(env, socketNum);
00025 }
00026
00027 TCPStreamSink::TCPStreamSink(UsageEnvironment& env, int socketNum)
00028 : MediaSink(env),
00029 fUnwrittenBytesStart(0), fUnwrittenBytesEnd(0),
00030 fInputSourceIsOpen(False), fOutputSocketIsWritable(True),
00031 fOutputSocketNum(socketNum) {
00032 }
00033
00034 TCPStreamSink::~TCPStreamSink() {
00035 }
00036
00037 Boolean TCPStreamSink::continuePlaying() {
00038 fInputSourceIsOpen = fSource != NULL;
00039 processBuffer();
00040
00041 return True;
00042 }
00043
00044 #define TCP_STREAM_SINK_MIN_READ_SIZE 1000
00045
00046 void TCPStreamSink::processBuffer() {
00047
00048 if (fOutputSocketIsWritable && numUnwrittenBytes() > 0) {
00049 int numBytesWritten
00050 = send(fOutputSocketNum, (const char*)&fBuffer[fUnwrittenBytesStart], numUnwrittenBytes(), 0);
00051 if (numBytesWritten < (int)numUnwrittenBytes()) {
00052
00053 fOutputSocketIsWritable = False;
00054 envir().taskScheduler().setBackgroundHandling(fOutputSocketNum, SOCKET_WRITABLE, socketWritableHandler, this);
00055 }
00056 if (numBytesWritten > 0) {
00057
00058 fUnwrittenBytesStart += numBytesWritten;
00059 if (fUnwrittenBytesStart > fUnwrittenBytesEnd) fUnwrittenBytesStart = fUnwrittenBytesEnd;
00060 if (fUnwrittenBytesStart == fUnwrittenBytesEnd && (!fInputSourceIsOpen || !fSource->isCurrentlyAwaitingData())) {
00061 fUnwrittenBytesStart = fUnwrittenBytesEnd = 0;
00062 }
00063 }
00064 }
00065
00066
00067 if (fInputSourceIsOpen && freeBufferSpace() >= TCP_STREAM_SINK_MIN_READ_SIZE && !fSource->isCurrentlyAwaitingData()) {
00068 fSource->getNextFrame(&fBuffer[fUnwrittenBytesEnd], freeBufferSpace(), afterGettingFrame, this, ourOnSourceClosure, this);
00069 }
00070
00071 if (!fInputSourceIsOpen && numUnwrittenBytes() == 0) {
00072
00073 onSourceClosure(this);
00074 }
00075 }
00076
00077 void TCPStreamSink::socketWritableHandler(void* clientData, int ) {
00078 TCPStreamSink* sink = (TCPStreamSink*)clientData;
00079 sink->socketWritableHandler1();
00080 }
00081
00082 void TCPStreamSink::socketWritableHandler1() {
00083 envir().taskScheduler().disableBackgroundHandling(fOutputSocketNum);
00084
00085 fOutputSocketIsWritable = True;
00086 processBuffer();
00087 }
00088
00089 void TCPStreamSink::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes,
00090 struct timeval , unsigned ) {
00091 TCPStreamSink* sink = (TCPStreamSink*)clientData;
00092 sink->afterGettingFrame(frameSize, numTruncatedBytes);
00093 }
00094
00095 void TCPStreamSink::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes) {
00096 if (numTruncatedBytes > 0) {
00097 envir() << "TCPStreamSink::afterGettingFrame(): The input frame data was too large for our buffer. "
00098 << numTruncatedBytes
00099 << " bytes of trailing data was dropped! Correct this by increasing the definition of \"TCP_STREAM_SINK_BUFFER_SIZE\" in \"include/TCPStreamSink.hh\".\n";
00100 }
00101 fUnwrittenBytesEnd += frameSize;
00102 processBuffer();
00103 }
00104
00105 void TCPStreamSink::ourOnSourceClosure(void* clientData) {
00106 TCPStreamSink* sink = (TCPStreamSink*)clientData;
00107 sink->ourOnSourceClosure1();
00108 }
00109
00110 void TCPStreamSink::ourOnSourceClosure1() {
00111
00112 fInputSourceIsOpen = False;
00113 processBuffer();
00114 }