liveMedia/MPEG2TransportStreamFramer.cpp

Go to the documentation of this file.
00001 /**********
00002 This library is free software; you can redistribute it and/or modify it under
00003 the terms of the GNU Lesser General Public License as published by the
00004 Free Software Foundation; either version 2.1 of the License, or (at your
00005 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
00006 
00007 This library is distributed in the hope that it will be useful, but WITHOUT
00008 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00009 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
00010 more details.
00011 
00012 You should have received a copy of the GNU Lesser General Public License
00013 along with this library; if not, write to the Free Software Foundation, Inc.,
00014 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
00015 **********/
00016 // "liveMedia"
00017 // Copyright (c) 1996-2013 Live Networks, Inc.  All rights reserved.
00018 // A filter that passes through (unchanged) chunks that contain an integral number
00019 // of MPEG-2 Transport Stream packets, but returning (in "fDurationInMicroseconds")
00020 // an updated estimate of the time gap between chunks.
00021 // Implementation
00022 
00023 #include "MPEG2TransportStreamFramer.hh"
00024 #include <GroupsockHelper.hh> // for "gettimeofday()"
00025 
00026 #define TRANSPORT_PACKET_SIZE 188
00027 
00029 
00030 #if !defined(NEW_DURATION_WEIGHT)
00031 #define NEW_DURATION_WEIGHT 0.5
00032   // How much weight to give to the latest duration measurement (must be <= 1)
00033 #endif
00034 
00035 #if !defined(TIME_ADJUSTMENT_FACTOR)
00036 #define TIME_ADJUSTMENT_FACTOR 0.8
00037   // A factor by which to adjust the duration estimate to ensure that the overall
00038   // packet transmission times remains matched with the PCR times (which will be the
00039   // times that we expect receivers to play the incoming packets).
00040   // (must be <= 1)
00041 #endif
00042 
00043 #if !defined(MAX_PLAYOUT_BUFFER_DURATION)
00044 #define MAX_PLAYOUT_BUFFER_DURATION 0.1 // (seconds)
00045 #endif
00046 
00047 #if !defined(PCR_PERIOD_VARIATION_RATIO)
00048 #define PCR_PERIOD_VARIATION_RATIO 0.5
00049 #endif
00050 
00052 
00053 class PIDStatus {
00054 public:
00055   PIDStatus(double _firstClock, double _firstRealTime)
00056     : firstClock(_firstClock), lastClock(_firstClock),
00057       firstRealTime(_firstRealTime), lastRealTime(_firstRealTime),
00058       lastPacketNum(0) {
00059   }
00060 
00061   double firstClock, lastClock, firstRealTime, lastRealTime;
00062   u_int64_t lastPacketNum;
00063 };
00064 
00065 
00067 
00068 MPEG2TransportStreamFramer* MPEG2TransportStreamFramer
00069 ::createNew(UsageEnvironment& env, FramedSource* inputSource) {
00070   return new MPEG2TransportStreamFramer(env, inputSource);
00071 }
00072 
00073 MPEG2TransportStreamFramer
00074 ::MPEG2TransportStreamFramer(UsageEnvironment& env, FramedSource* inputSource)
00075   : FramedFilter(env, inputSource),
00076     fTSPacketCount(0), fTSPacketDurationEstimate(0.0), fTSPCRCount(0),
00077     fLimitNumTSPacketsToStream(False), fNumTSPacketsToStream(0),
00078     fLimitTSPacketsToStreamByPCR(False), fPCRLimit(0.0) {
00079   fPIDStatusTable = HashTable::create(ONE_WORD_HASH_KEYS);
00080 }
00081 
00082 MPEG2TransportStreamFramer::~MPEG2TransportStreamFramer() {
00083   clearPIDStatusTable();
00084   delete fPIDStatusTable;
00085 }
00086 
00087 void MPEG2TransportStreamFramer::clearPIDStatusTable() {
00088   PIDStatus* pidStatus;
00089   while ((pidStatus = (PIDStatus*)fPIDStatusTable->RemoveNext()) != NULL) {
00090     delete pidStatus;
00091   }
00092 }
00093 
00094 void MPEG2TransportStreamFramer::setNumTSPacketsToStream(unsigned long numTSRecordsToStream) {
00095   fNumTSPacketsToStream = numTSRecordsToStream;
00096   fLimitNumTSPacketsToStream = numTSRecordsToStream > 0;
00097 }
00098 
00099 void MPEG2TransportStreamFramer::setPCRLimit(float pcrLimit) {
00100   fPCRLimit = pcrLimit;
00101   fLimitTSPacketsToStreamByPCR = pcrLimit != 0.0;
00102 }
00103 
00104 void MPEG2TransportStreamFramer::doGetNextFrame() {
00105   if (fLimitNumTSPacketsToStream) {
00106     if (fNumTSPacketsToStream == 0) {
00107       handleClosure(this);
00108       return;
00109     }
00110     if (fNumTSPacketsToStream*TRANSPORT_PACKET_SIZE < fMaxSize) {
00111       fMaxSize = fNumTSPacketsToStream*TRANSPORT_PACKET_SIZE;
00112     }
00113   }
00114 
00115   // Read directly from our input source into our client's buffer:
00116   fFrameSize = 0;
00117   fInputSource->getNextFrame(fTo, fMaxSize,
00118                              afterGettingFrame, this,
00119                              FramedSource::handleClosure, this);
00120 }
00121 
00122 void MPEG2TransportStreamFramer::doStopGettingFrames() {
00123   FramedFilter::doStopGettingFrames();
00124   fTSPacketCount = 0;
00125   fTSPCRCount = 0;
00126 
00127   clearPIDStatusTable();
00128 }
00129 
00130 void MPEG2TransportStreamFramer
00131 ::afterGettingFrame(void* clientData, unsigned frameSize,
00132                     unsigned /*numTruncatedBytes*/,
00133                     struct timeval presentationTime,
00134                     unsigned /*durationInMicroseconds*/) {
00135   MPEG2TransportStreamFramer* framer = (MPEG2TransportStreamFramer*)clientData;
00136   framer->afterGettingFrame1(frameSize, presentationTime);
00137 }
00138 
00139 #define TRANSPORT_SYNC_BYTE 0x47
00140 
00141 void MPEG2TransportStreamFramer::afterGettingFrame1(unsigned frameSize,
00142                                                     struct timeval presentationTime) {
00143   fFrameSize += frameSize;
00144   unsigned const numTSPackets = fFrameSize/TRANSPORT_PACKET_SIZE;
00145   fNumTSPacketsToStream -= numTSPackets;
00146   fFrameSize = numTSPackets*TRANSPORT_PACKET_SIZE; // an integral # of TS packets
00147   if (fFrameSize == 0) {
00148     // We didn't read a complete TS packet; assume that the input source has closed.
00149     handleClosure(this);
00150     return;
00151   }
00152 
00153   // Make sure the data begins with a sync byte:
00154   unsigned syncBytePosition;
00155   for (syncBytePosition = 0; syncBytePosition < fFrameSize; ++syncBytePosition) {
00156     if (fTo[syncBytePosition] == TRANSPORT_SYNC_BYTE) break;
00157   }
00158   if (syncBytePosition == fFrameSize) {
00159     envir() << "No Transport Stream sync byte in data.";
00160     handleClosure(this);
00161     return;
00162   } else if (syncBytePosition > 0) {
00163     // There's a sync byte, but not at the start of the data.  Move the good data
00164     // to the start of the buffer, then read more to fill it up again:
00165     memmove(fTo, &fTo[syncBytePosition], fFrameSize - syncBytePosition);
00166     fFrameSize -= syncBytePosition;
00167     fInputSource->getNextFrame(&fTo[fFrameSize], syncBytePosition,
00168                                afterGettingFrame, this,
00169                                FramedSource::handleClosure, this);
00170     return;
00171   } // else normal case: the data begins with a sync byte
00172 
00173   fPresentationTime = presentationTime;
00174 
00175   // Scan through the TS packets that we read, and update our estimate of
00176   // the duration of each packet:
00177   struct timeval tvNow;
00178   gettimeofday(&tvNow, NULL);
00179   double timeNow = tvNow.tv_sec + tvNow.tv_usec/1000000.0;
00180   for (unsigned i = 0; i < numTSPackets; ++i) {
00181     if (!updateTSPacketDurationEstimate(&fTo[i*TRANSPORT_PACKET_SIZE], timeNow)) {
00182       // We hit a preset limit (based on PCR) within the stream.  Handle this as if the input source has closed:
00183       handleClosure(this);
00184       return;
00185     }
00186   }
00187 
00188   fDurationInMicroseconds
00189     = numTSPackets * (unsigned)(fTSPacketDurationEstimate*1000000);
00190 
00191   // Complete the delivery to our client:
00192   afterGetting(this);
00193 }
00194 
00195 Boolean MPEG2TransportStreamFramer::updateTSPacketDurationEstimate(unsigned char* pkt, double timeNow) {
00196   // Sanity check: Make sure we start with the sync byte:
00197   if (pkt[0] != TRANSPORT_SYNC_BYTE) {
00198     envir() << "Missing sync byte!\n";
00199     return True;
00200   }
00201 
00202   ++fTSPacketCount;
00203 
00204   // If this packet doesn't contain a PCR, then we're not interested in it:
00205   u_int8_t const adaptation_field_control = (pkt[3]&0x30)>>4;
00206   if (adaptation_field_control != 2 && adaptation_field_control != 3) return True;
00207       // there's no adaptation_field
00208 
00209   u_int8_t const adaptation_field_length = pkt[4];
00210   if (adaptation_field_length == 0) return True;
00211 
00212   u_int8_t const discontinuity_indicator = pkt[5]&0x80;
00213   u_int8_t const pcrFlag = pkt[5]&0x10;
00214   if (pcrFlag == 0) return True; // no PCR
00215 
00216   // There's a PCR.  Get it, and the PID:
00217   ++fTSPCRCount;
00218   u_int32_t pcrBaseHigh = (pkt[6]<<24)|(pkt[7]<<16)|(pkt[8]<<8)|pkt[9];
00219   double clock = pcrBaseHigh/45000.0;
00220   if ((pkt[10]&0x80) != 0) clock += 1/90000.0; // add in low-bit (if set)
00221   unsigned short pcrExt = ((pkt[10]&0x01)<<8) | pkt[11];
00222   clock += pcrExt/27000000.0;
00223   if (fLimitTSPacketsToStreamByPCR) {
00224     if (clock > fPCRLimit) {
00225       // We've hit a preset limit within the stream:
00226       return False;
00227     }
00228   }
00229 
00230   unsigned pid = ((pkt[1]&0x1F)<<8) | pkt[2];
00231 
00232   // Check whether we already have a record of a PCR for this PID:
00233   PIDStatus* pidStatus = (PIDStatus*)(fPIDStatusTable->Lookup((char*)pid));
00234 
00235   if (pidStatus == NULL) {
00236     // We're seeing this PID's PCR for the first time:
00237     pidStatus = new PIDStatus(clock, timeNow);
00238     fPIDStatusTable->Add((char*)pid, pidStatus);
00239 #ifdef DEBUG_PCR
00240     fprintf(stderr, "PID 0x%x, FIRST PCR 0x%08x+%d:%03x == %f @ %f, pkt #%lu\n", pid, pcrBaseHigh, pkt[10]>>7, pcrExt, clock, timeNow, fTSPacketCount);
00241 #endif
00242   } else {
00243     // We've seen this PID's PCR before; update our per-packet duration estimate:
00244     int64_t packetsSinceLast = (int64_t)(fTSPacketCount - pidStatus->lastPacketNum);
00245       // it's "int64_t" because some compilers can't convert "u_int64_t" -> "double"
00246     double durationPerPacket = (clock - pidStatus->lastClock)/packetsSinceLast;
00247 
00248     // Hack (suggested by "Romain"): Don't update our estimate if this PCR appeared unusually quickly.
00249     // (This can produce more accurate estimates for wildly VBR streams.)
00250     double meanPCRPeriod = 0.0;
00251     if (fTSPCRCount > 0) {
00252       double tsPacketCount = (double)(int64_t)fTSPacketCount;
00253       double tsPCRCount = (double)(int64_t)fTSPCRCount;
00254       meanPCRPeriod = tsPacketCount/tsPCRCount;
00255       if (packetsSinceLast < meanPCRPeriod*PCR_PERIOD_VARIATION_RATIO) return True;
00256     }
00257 
00258     if (fTSPacketDurationEstimate == 0.0) { // we've just started
00259       fTSPacketDurationEstimate = durationPerPacket;
00260     } else if (discontinuity_indicator == 0 && durationPerPacket >= 0.0) {
00261       fTSPacketDurationEstimate
00262         = durationPerPacket*NEW_DURATION_WEIGHT
00263         + fTSPacketDurationEstimate*(1-NEW_DURATION_WEIGHT);
00264 
00265       // Also adjust the duration estimate to try to ensure that the transmission
00266       // rate matches the playout rate:
00267       double transmitDuration = timeNow - pidStatus->firstRealTime;
00268       double playoutDuration = clock - pidStatus->firstClock;
00269       if (transmitDuration > playoutDuration) {
00270         fTSPacketDurationEstimate *= TIME_ADJUSTMENT_FACTOR; // reduce estimate
00271       } else if (transmitDuration + MAX_PLAYOUT_BUFFER_DURATION < playoutDuration) {
00272         fTSPacketDurationEstimate /= TIME_ADJUSTMENT_FACTOR; // increase estimate
00273       }
00274     } else {
00275       // the PCR has a discontinuity from its previous value; don't use it now,
00276       // but reset our PCR and real-time values to compensate:
00277       pidStatus->firstClock = clock;
00278       pidStatus->firstRealTime = timeNow;
00279     }
00280 #ifdef DEBUG_PCR
00281     fprintf(stderr, "PID 0x%x, PCR 0x%08x+%d:%03x == %f @ %f (diffs %f @ %f), pkt #%lu, discon %d => this duration %f, new estimate %f, mean PCR period=%f\n", pid, pcrBaseHigh, pkt[10]>>7, pcrExt, clock, timeNow, clock - pidStatus->firstClock, timeNow - pidStatus->firstRealTime, fTSPacketCount, discontinuity_indicator != 0, durationPerPacket, fTSPacketDurationEstimate, meanPCRPeriod );
00282 #endif
00283   }
00284 
00285   pidStatus->lastClock = clock;
00286   pidStatus->lastRealTime = timeNow;
00287   pidStatus->lastPacketNum = fTSPacketCount;
00288 
00289   return True;
00290 }

Generated on Tue Jun 18 13:16:51 2013 for live by  doxygen 1.5.2