google-cloud-pubsub

Google Cloud Platform pub/sub library.

crates.io

Installation

toml [dependencies] google-cloud-pubsub = <version> google-cloud-default = { version = <version>, features = ["pubsub"] }

Quickstart

Authentication

There are two ways to create a client that is authenticated against the google cloud. The crate google-cloud-default provides two methods that help to implement those.

Automatically

The function with_auth() will try and read the credentials from a file specified in the environment variable GOOGLE_APPLICATION_CREDENTIALS, GOOGLE_APPLICATION_CREDENTIALS_JSON or from a metadata server.

This is also described in google-cloud-auth

```rust use googlecloudpubsub::client::{ClientConfig, Client}; use googleclouddefault::WithAuthExt;

async fn run() { let config = ClientConfig::default().with_auth().await.unwrap(); let client = Client::new(config).await.unwrap(); } ```

Manually

When you can't use the gcloud authentication but you have a different way to get your credentials (e.g a different environment variable) you can parse your own version of the 'credentials-file' and use it like that:

```rust use googlecloudauth::credentials::CredentialsFile; use googlecloudpubsub::client::{ClientConfig, Client}; use googleclouddefault::WithAuthExt;

async fn run(cred: CredentialsFile) { let config = ClientConfig::default().with_credentials(cred).await.unwrap(); let client = Client::new(config).await.unwrap(); } ```

Emulator

For tests, you can use the Emulator-Option like that: Before executing the program, specify the address of the emulator in the following environment variable.

sh export PUBSUB_EMULATOR_HOST=localhost:8681

Publish Message

``` use googlecloudpubsub::client::{Client, ClientConfig}; use googlecloudgoogleapis::pubsub::v1::PubsubMessage; use googlecloudpubsub::topic::TopicConfig; use googlecloudpubsub::subscription::SubscriptionConfig; use googlecloudgax::grpc::Status; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken;

async fn run(config: ClientConfig) -> Result<(), Status> {

// Create pubsub client.
let client = Client::new(config).await.unwrap();

// Create topic.
let topic = client.topic("test-topic");
if !topic.exists(None).await? {
    topic.create(None, None).await?;
}

// Start publisher.
let publisher = topic.new_publisher(None);

// Publish message.
let tasks : Vec<JoinHandle<Result<String,Status>>> = (0..10).into_iter().map(|_i| {
    let publisher = publisher.clone();
    tokio::spawn(async move {
        let msg = PubsubMessage {
           data: "abc".into(),
           // Set ordering_key if needed (https://cloud.google.com/pubsub/docs/ordering)
           ordering_key: "order".into(),
           ..Default::default()
        };

        // Send a message. There are also `publish_bulk` and `publish_immediately` methods.
        let mut awaiter = publisher.publish(msg).await;

        // The get method blocks until a server-generated ID or an error is returned for the published message.
        awaiter.get().await
    })
}).collect();

// Wait for all publish task finish
for task in tasks {
    let message_id = task.await.unwrap()?;
}

// Wait for publishers in topic finish.
let mut publisher = publisher;
publisher.shutdown();

Ok(())

} ```

Subscribe Message

``` use googlecloudpubsub::client::{Client, ClientConfig}; use googlecloudgoogleapis::pubsub::v1::PubsubMessage; use googlecloudpubsub::subscription::SubscriptionConfig; use googlecloudgax::grpc::Status; use std::time::Duration; use tokioutil::sync::CancellationToken; use futuresutil::StreamExt;

async fn run(config: ClientConfig) -> Result<(), Status> {

// Create pubsub client.
let client = Client::new(config).await.unwrap();

// Get the topic to subscribe to.
let topic = client.topic("test-topic");

// Create subscription
// If subscription name does not contain a "/", then the project is taken from client above. Otherwise, the
// name will be treated as a fully qualified resource name
let config = SubscriptionConfig {
    // Enable message ordering if needed (https://cloud.google.com/pubsub/docs/ordering)
    enable_message_ordering: true,
    ..Default::default()
};

// Create subscription
let subscription = client.subscription("test-subscription");
if !subscription.exists(None).await? {
    subscription.create(topic.fully_qualified_name(), config, None).await?;
}

// Token for cancel.
let cancel = CancellationToken::new();
let cancel2 = cancel.clone();
tokio::spawn(async move {
    // Cancel after 10 seconds.
    tokio::time::sleep(Duration::from_secs(10)).await;
    cancel2.cancel();
});

// Receive blocks until the ctx is cancelled or an error occurs.
// Or simply use the `subscription.subscribe` method.
subscription.receive(|mut message, cancel| async move {
    // Handle data.
    println!("Got Message: {:?}", message.message.data);

    // Ack or Nack message.
    let _ = message.ack().await;
}, cancel.clone(), None).await?;

// Delete subscription if needed.
subscription.delete(None).await?;

Ok(())

} ```

Subscribe Message (Alternative Way)

```norun use googlecloudpubsub::client::{Client, ClientConfig}; use googlecloudgoogleapis::pubsub::v1::PubsubMessage; use googlecloudpubsub::subscription::SubscriptionConfig; use googlecloudgax::grpc::Status; use std::time::Duration; use tokioutil::sync::CancellationToken; use futures_util::StreamExt;

async fn run(config: ClientConfig) -> Result<(), Status> { // Creating Client, Topic and Subscription... let client = Client::new(config).await.unwrap(); let subscription = client.subscription("test-subscription");

// Read the messages as a stream
// (needs futures_util::StreamExt as import)
// Note: This blocks the current thread but helps working with non clonable data
let mut stream = subscription.subscribe(None).await?;
while let Some(message) = stream.next().await {
    // Handle data.
    println!("Got Message: {:?}", message.message);

    // Ack or Nack message.
    let _ = message.ack().await;
}
Ok(())

} ```