mpeg2ts-reader

Rust reader for MPEG2 Transport Stream data

Build Status crates.io version Documentation Coverage Status Unstable API

Zero-copy access to payload data within an MPEG Transport Stream.

This crate, - implements a low-level state machine that recognises the structural elements of Transport Stream syntax - provides traits that you should implement to define your application-specific processing of the contained data.

Example

Dump H264 payload data as hex.

```rust

[macro_use]

extern crate mpeg2tsreader; extern crate hexslice;

use hexslice::AsHex; use mpeg2tsreader::demultiplex; use mpeg2tsreader::packet; use mpeg2tsreader::pes; use mpeg2tsreader::psi; use mpeg2tsreader::StreamType; use std::cmp; use std::env; use std::fs::File; use std::io::Read;

// This macro invocation creates an enum called DumpFilterSwitch, encapsulating all possible ways // that this application may handle transport stream packets. Each enum variant is just a wrapper // around an implementation of the PacketFilter trait packetfilterswitch!{ DumpFilterSwitch { // the DumpFilterSwitch::Pes variant will perform the logic actually specific to this // application, Pes: pes::PesPacketFilter,

    // these definitions are boilerplate required by the framework,
    Pat: demultiplex::PatPacketFilter<DumpDemuxContext>,
    Pmt: demultiplex::PmtPacketFilter<DumpDemuxContext>,

    // this variant will be used when we want to ignore data in the transport stream that this
    // application does not care about
    Null: demultiplex::NullPacketFilter<DumpDemuxContext>,
}

}

// This macro invocation creates a type called DumpDemuxContext, which is our application-specific // implementation of the DemuxContext trait. demux_context!(DumpDemuxContext, DumpStreamConstructor);

// When the de-multiplexing process needs to create a PacketFilter instance to handle a particular // kind of data discovered within the Transport Stream being processed, it will send a // FilterRequest to our application-specific implementation of the StreamConstructor trait pub struct DumpStreamConstructor; impl demultiplex::StreamConstructor for DumpStreamConstructor { type F = DumpFilterSwitch;

fn construct(&mut self, req: demultiplex::FilterRequest) -> Self::F {
    match req {
        // The 'Program Association Table' is is always on PID 0.  We just use the standard
        // handling here, but an application could insert its own logic if required,
        demultiplex::FilterRequest::ByPid(packet::Pid::PAT) => {
            DumpFilterSwitch::Pat(demultiplex::PatPacketFilter::default())
        }
        // 'Stuffing' data on PID 0x1fff may be used to pad-out parts of the transport stream
        // so that it has constant overall bitrate.  This causes it to be ignored if present.
        demultiplex::FilterRequest::ByPid(packet::Pid::STUFFING) => {
            DumpFilterSwitch::Null(demultiplex::NullPacketFilter::default())
        }
        // Some Transport Streams will contain data on 'well known' PIDs, which are not
        // announced in PAT / PMT metadata.  This application does not process any of these
        // well known PIDs, so we register NullPacketFiltet such that they will be ignored
        demultiplex::FilterRequest::ByPid(_) => {
            DumpFilterSwitch::Null(demultiplex::NullPacketFilter::default())
        }
        // This match-arm installs our application-specific handling for each H264 stream
        // discovered within the transport stream,
        demultiplex::FilterRequest::ByStream {
            stream_type: StreamType::H264,
            pmt,
            stream_info,
            ..
        } => PtsDumpElementaryStreamConsumer::construct(pmt, stream_info),
        // We need to have a match-arm to specify how to handle any other StreamType values
        // that might be present; we answer with NullPacketFilter so that anything other than
        // H264 (handled above) is ignored,
        demultiplex::FilterRequest::ByStream { .. } => {
            DumpFilterSwitch::Null(demultiplex::NullPacketFilter::default())
        }
        // The 'Program Map Table' defines the sub-streams for a particular program within the
        // Transport Stream (it is common for Transport Streams to contain only one program).
        // We just use the standard handling here, but an application could insert its own
        // logic if required,
        demultiplex::FilterRequest::Pmt {
            pid,
            program_number,
        } => DumpFilterSwitch::Pmt(demultiplex::PmtPacketFilter::new(pid, program_number)),
        // Ignore 'Network Information Table', if present,
        demultiplex::FilterRequest::Nit { .. } => {
            DumpFilterSwitch::Null(demultiplex::NullPacketFilter::default())
        }
    }
}

}

// Implement the ElementaryStreamConsumer to just dump and PTS/DTS timestamps to stdout pub struct PtsDumpElementaryStreamConsumer { pid: packet::Pid, len: Option, } impl PtsDumpElementaryStreamConsumer { fn construct( pmtsect: &psi::pmt::PmtSection, streaminfo: &psi::pmt::StreamInfo, ) -> DumpFilterSwitch { let filter = pes::PesPacketFilter::new(PtsDumpElementaryStreamConsumer { pid: streaminfo.elementarypid(), len: None, }); DumpFilterSwitch::Pes(filter) } } impl pes::ElementaryStreamConsumer for PtsDumpElementaryStreamConsumer { fn startstream(&mut self) {} fn beginpacket(&mut self, header: pes::PesHeader) { match header.contents() { pes::PesContents::Parsed(Some(parsed)) => { match parsed.ptsdts() { Ok(pes::PtsDts::PtsOnly(Ok(pts))) => { print!("{:?}: pts {:#08x} ", self.pid, pts.value()) } Ok(pes::PtsDts::Both { pts: Ok(pts), dts: Ok(dts), }) => print!( "{:?}: pts {:#08x} dts {:#08x} ", self.pid, pts.value(), dts.value() ), _ => (), } let payload = parsed.payload(); self.len = Some(payload.len()); println!( "{:02x}", payload[..cmp::min(payload.len(), 16)].plainhex(false) ) } pes::PesContents::Parsed(None) => (), pes::PesContents::Payload(payload) => { self.len = Some(payload.len()); println!( "{:?}: {:02x}", self.pid, payload[..cmp::min(payload.len(), 16)].plainhex(false) ) } } } fn continuepacket(&mut self, data: &[u8]) { println!( "{:?}: continues {:02x}", self.pid, data[..cmp::min(data.len(), 16)].plainhex(false) ); self.len = self.len.map(|l| l + data.len()); } fn endpacket(&mut self) { println!("{:?}: end of packet length={:?}", self.pid, self.len); } fn continuityerror(&mut self) {} }

fn main() { // open input file named on command line, let name = env::args().nth(1).unwrap(); let mut f = File::open(&name).expect(&format!("file not found: {}", &name));

// create the context object that stores the state of the transport stream demultiplexing
// process
let mut ctx = DumpDemuxContext::new(DumpStreamConstructor);

// create the demultiplexer, which will use the ctx to create a filter for pid 0 (PAT)
let mut demux = demultiplex::Demultiplex::new(&mut ctx);

// consume the input file,
let mut buf = [0u8; 188 * 1024];
loop {
    match f.read(&mut buf[..]).expect("read failed") {
        0 => break,
        n => demux.push(&mut ctx, &buf[0..n]),
    }
}

} ```

Performance

On my laptop (which can read sequentially from main memory at around 16GiByte/s), a microbenchmark that parses TS structure, but ignores the audio and video contained within, can process at a rate of 10 GiBytes/s (80 Gibits/s).

Real usage that actually processes the contents of the stream will of course be slower!

The conditions of the test are, * the data is already in memory (no network/disk access) * test dataset is larger than CPU cache * processing is happening on a single core (no multiprocessing of the stream).

Perf shoot-out

Comparing this crate to a couple of others which you might use to read a Transport Stream -- mpeg2ts and ffmpg-sys:

Performance

The benchmarks producing the above chart data are in the shootout folder. (If the benchmarks are giving an unfair representation of relative performance, that's a mistake -- please raise a bug!)

Supported Transport Stream features

Not all Transport Stream features are supported yet. Here's a summary of what's available, and what's yet to come: