This is a simple rtmp library for easy use and reading, you can build your own single rtmp server or a cluster .
You can use this library to push or pull RTMP streams , refer to the pprtmp example from xiu/application/pprtmp.
use rtmp::channels::channels::ChannelsManager;
use rtmp::rtmp::RtmpServer;
#[tokio::main]
async fn main() -> Result<()> {
let mut channel = ChannelsManager::new();
let producer = channel.get_session_event_producer();
let listen_port = 1935;
let address = format!("0.0.0.0:{port}", port = listen_port);
let mut rtmp_server = RtmpServer::new(address, producer.clone());
tokio::spawn(async move {
if let Err(err) = rtmp_server.run().await {
log::error!("rtmp server error: {}\n", err);
}
});
tokio::spawn(async move { channel.run().await });
signal::ctrl_c().await?;
Ok(())
}
use rtmp::channels::channels::ChannelsManager;
use rtmp::rtmp::RtmpServer;
use rtmp::relay::{pull_client::PullClient, push_client::PushClient},
#[tokio::main]
async fn main() -> Result<()> {
let mut channel = ChannelsManager::new();
let producer = channel.get_session_event_producer();
// push the rtmp stream from local to 192.168.0.2:1935
let address = format!(
"{ip}:{port}",
ip = "192.168.0.2",
port = 1935
);
let mut push_client = PushClient::new(
address,
channel.get_client_event_consumer(),
producer.clone(),
);
tokio::spawn(async move {
if let Err(err) = push_client.run().await {
log::error!("push client error {}\n", err);
}
});
channel.set_rtmp_push_enabled(true);
//pull the rtmp stream from 192.168.0.3:1935 to local
let address = format!(
"{ip}:{port}",
ip = "192.168.0.3",
port = pull_cfg_value.port
);
log::info!("start rtmp pull client from address: {}", address);
let mut pull_client = PullClient::new(
address,
channel.get_client_event_consumer(),
producer.clone(),
tokio::spawn(async move {
if let Err(err) = pull_client.run().await {
log::error!("pull client error {}\n", err);
}
});
channel.set_rtmp_pull_enabled(true);
// the local rtmp server
let listen_port = 1935;
let address = format!("0.0.0.0:{port}", port = listen_port);
let mut rtmp_server = RtmpServer::new(address, producer.clone());
tokio::spawn(async move {
if let Err(err) = rtmp_server.run().await {
log::error!("rtmp server error: {}\n", err);
}
});
tokio::spawn(async move { channel.run().await });
signal::ctrl_c().await?;
Ok(())
}
For more detailed implementation please reference to xiu server
Support rtmp pushlish and play
Support rtmp relay pull and static push
Add amf0 functions
Add timestamp for metadata
Support complex handshake
Refactor some codes,update dependencies
Fix bugs;
Improve subscriber id;
Fix bugs;
Support cache GOP;
Refactor handshake mod;
Fix overflow error.[#17]
Add introductions and example codes in doc
Fix handshake error.[#23]
Update RTMP library version.
Support audio and video information statistics.
Support notify stream status.