Documentation crates.io

actix-web-buffering

Request/Response body buffering with support spooled to a temp file on disk

Use it in actix-web middleware. For example this is used at actix-web-detached-jws-middleware

Example:

```rust use std::{ cell::RefCell, pin::Pin, rc::Rc, sync::Arc, task::{Context, Poll}, };

use actixservice::Transform; use actixweb::{ dev::{Body, Service, ServiceRequest, ServiceResponse}, web, web::BytesMut, App, Error, HttpMessage, HttpResponse, HttpServer, Responder, }; use actixwebbuffering::{ enablerequestbuffering, enableresponsebuffering, FileBufferingStreamWrapper, }; use futures::{ future::{ok, Ready}, stream::StreamExt, Future, FutureExt, };

[actix_web::main]

async fn main() -> std::io::Result<()> { let wrapper = FileBufferingStreamWrapper::new() .tmpdir(std::env::tempdir()) .threshold(1024 * 30) .producechunksize(1024 * 30) .buffer_limit(Some(1024 * 30 * 10));

let wrapper = Arc::new(wrapper);

HttpServer::new(move || {
    let r1 = Arc::clone(&wrapper);
    let r2 = Arc::clone(&wrapper);
    App::new()
        .wrap(Example(r1))
        .wrap(Example(r2))
        .service(web::resource("/").route(web::post().to(echo)))
})
.bind("127.0.0.1:8080")?
.run()
.await

}

async fn echo(reqbody: String) -> impl Responder { HttpResponse::Ok().body(reqbody) }

struct Example(Arc);

impl Transform for Example where S: Service, Error = Error> + 'static, { type Request = ServiceRequest; type Response = ServiceResponse; type Error = Error; type InitError = (); type Transform = ExampleMiddleware; type Future = Ready>;

fn new_transform(&self, service: S) -> Self::Future {
    ok(ExampleMiddleware {
        service: Rc::new(RefCell::new(service)),
        wrapper: Arc::clone(&self.0),
    })
}

} pub struct ExampleMiddleware { service: Rc>, wrapper: Arc, }

impl Service for ExampleMiddleware where S: Service, Error = Error> + 'static, { type Request = ServiceRequest; type Response = ServiceResponse; type Error = Error; type Future = Pin>>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    self.service.poll_ready(cx)
}

fn call(&mut self, mut req: ServiceRequest) -> Self::Future {
    let mut svc = self.service.clone();
    let wrapper = self.wrapper.clone();

    async move {
        enable_request_buffering(&wrapper, &mut req);

        let mut stream = req.take_payload();
        let mut body = BytesMut::new();
        while let Some(chunk) = stream.next().await {
            body.extend_from_slice(&chunk.unwrap());
        }
        req.set_payload(stream);
        println!("request body: {:?}", body);

        let svc_res = svc.call(req).await?;

        let mut svc_res = enable_response_buffering(&wrapper, svc_res);

        let mut stream = svc_res.take_body();
        let mut body = BytesMut::new();
        while let Some(chunk) = stream.next().await {
            body.extend_from_slice(&chunk.unwrap());
        }
        let svc_res = svc_res.map_body(|_, _| stream);
        println!("response body: {:?}", body);

        Ok(svc_res)
    }
    .boxed_local()
}

} ```