parside

parside codecov

Parser library for Composable Event Streaming Representation (CESR).

This library is currently under construction.

Example

Example parsing code (top level ACDC, KEL and TEL parsing):

```rust fn verifyacdc(&self, creder: &Creder, quadlets: &AttachedMaterialQuadlets, deep: Option) -> Result { if creder.status()?.isnone() { return err!(Error::Validation); };

    let vcid = creder.said()?;
    let schema = creder.schema()?;
    let prov = creder.chains()?;

    let event = self.store.get_latest_transaction_event(&vcid)?;
    let state = Serder::new_with_raw(event.as_bytes())?;
    let dtnow = chrono::Utc::now();
    let dte = chrono::DateTime::parse_from_rfc3339(&state.ked()["dt"].to_string()?)?.with_timezone(&chrono::Utc);
    if (dtnow - dte).num_seconds() > Self::DEFAULT_CREDENTIAL_EXPIRY_SECONDS {
        return err!(Error::Validation);
    }

    // added brv here for safety even though unimplemented
    if [Ilkage::rev, Ilkage::brv].contains(&state.ked()[Ids::t].to_string()?.as_str()) {
        return err!(Error::Validation);
    }

    schema_cache().verify(&schema, std::str::from_utf8(&creder.raw())?)?;

    let mut rooted = false;

    for group in quadlets.value() {
        match group {
            CesrGroup::SadPathSigVariant { value } => {
                for sad_path_sig in value.value() {
                    if sad_path_sig.pather.bext()? != "-" {
                        continue;
                    }

                    rooted = true;

                    let event = self.store.get_key_event(&sad_path_sig.prefixer.qb64()?, sad_path_sig.seqner.sn()? as u32)?;
                    let serder = Serder::new_with_raw(event.as_bytes())?;
                    if serder.said()? != sad_path_sig.saider.qb64()? {
                        return err!(Error::Verification)
                    }

                    let verfers = serder.verfers()?;
                    let mut sigers = vec![];
                    for controller_idx_sig in sad_path_sig.sigers.value() {
                        let siger = &controller_idx_sig.siger;
                        if !sigers.contains(siger) {
                            sigers.push(siger.clone())
                        }
                    }

                    let mut verified_indices = vec![];
                    for siger in sigers {
                        if siger.index() as usize > verfers.len() {
                            return err!(Error::Verification);
                        }

                        if verfers[siger.index() as usize].verify(&siger.raw(), &creder.raw())? {
                            verified_indices.push(siger.index());
                        }
                    }

                    if let Some(tholder) = serder.tholder()? {
                        if !tholder.satisfy(&verified_indices)? {
                            return err!(Error::Verification);
                        }
                    } else {
                        return err!(Error::Verification);
                    }
                }                        
            },
            _ => return err!(Error::Decoding)
        }
    }

    if !rooted {
        return err!(Error::Verification);
    }

    let edges = if prov.to_map().is_ok() {
        vec![prov]
    } else if prov.to_vec().is_ok() {
        prov.to_vec()?
    } else {
        return err!(Error::Verification);
    };

    for edge in &edges {
        for (label, node) in edge.to_map()? {
            if [Ids::d, "o"].contains(&label.as_str()) {
                continue;
            }

            let node_said = &node["n"].to_string()?;
            let message = self.store.get_acdc(&node_said)?;

            // here we need to default to true
            if deep.unwrap_or(true) {
                let pacdc = Creder::new_with_raw(message.as_bytes())?;

                let (_, message_list) = MessageList::from_stream_bytes(message[pacdc.raw().len()..].as_bytes())?;
                let group = message_list.messages[0].cesr_group()?;

                match group {
                    CesrGroup::AttachedMaterialQuadletsVariant { value } => {
                        self.verify_acdc(&pacdc, value, deep)?;
                    },
                    _ => return err!(Error::Decoding)
                }
            }
        }
    }

    let result = self.store.get_acdc(&vcid);
    let existing = result.is_ok();
    if existing {
        let message = result.unwrap();
        let eacdc = Creder::new_with_raw(message.as_bytes())?;

        // this seems very bad, it means something is in the database that shouldn't be there. how did it get there?
        if vcid != eacdc.said()? {
            return err!(Error::Programmer);
        }
    }

    println!("succesfully verified acdc {vcid}");

    Ok(existing)
}

fn verify_key_event(&self, serder: &Serder, quadlets: &AttachedMaterialQuadlets, deep: Option<bool>) -> Result<bool> {
    let pre = serder.pre()?;
    let ked = serder.ked();

    // see if code is supported
    let prefixer = Prefixer::new_with_qb64(&pre)?;

    let sn = serder.sn()?;
    let ilk = ked["t"].to_string()?;
    let said = serder.said()?;

    let mut existing = false;

    let (verfers, tholder) = if serder.est()? {
        let tholder = if let Some(tholder) = serder.tholder()? {
            tholder
        } else {
            return err!(Error::Decoding);
        };

        (serder.verfers()?, tholder)
    } else {
        let (raw, _) = self.store.get_latest_establishment_event_as_of_sn(&pre, sn as u32 - 1)?;
        let serder = Serder::new_with_raw(raw.as_bytes())?;

        let tholder = if let Some(tholder) = serder.tholder()? {
            tholder
        } else {
            return err!(Error::Decoding);
        };

        (serder.verfers()?, tholder)
    };

    let mut verified_indices = vec![0u32; 0];
    for group in quadlets.value() {
        match group {
            CesrGroup::ControllerIdxSigsVariant { value } => {
                for controller_idx_sig in value.value() {
                    let siger = &controller_idx_sig.siger;

                    if siger.index() as usize > verfers.len() {
                        return err!(Error::Verification);
                    }

                    if verfers[siger.index() as usize].verify(&siger.raw(), &serder.raw())? {
                        verified_indices.push(siger.index());
                    }
                }
            }
            _ => return err!(Error::Decoding)
        }
    }

    if !tholder.satisfy(&verified_indices)? {
        return err!(Error::Verification);
    }

    let labels = match ilk.as_str() {
        Ilkage::icp => &Self::ICP_LABELS,
        Ilkage::rot => &Self::ROT_LABELS,
        Ilkage::ixn => &Self::IXN_LABELS,
        _ => return err!(Error::Decoding),
    };
    if !Self::verify_ked_labels(&ked, labels)? {
        return err!(Error::Validation);
    }

    let inceptive = &ilk == Ilkage::icp;
    let key_event_count = self.store.count_key_events(&pre)?;
    if inceptive {
        if !prefixer.verify(&serder.ked(), Some(true))? {
            return err!(Error::Verification);
        }

        if sn != 0 {
            // must be 0
            return err!(Error::Decoding);
        }

        if said != serder.pre()? {
            return err!(Error::Verification);
        }

        if key_event_count != 0 {
            existing = true;
        }
    } else {
        if !serder.saider().verify(&serder.ked(), Some(inceptive), Some(true), None, None, None)? {
            return err!(Error::Verification);
        }

        if sn < 1 {
            return err!(Error::Validation);
        }

        let sno = key_event_count as u128;

        if sn > sno {
            // escrow here
            return err!(Error::OutOfOrder);
        }

        if sn != sno {
            existing = true;
        }

        // this sn implementation will become a problem at around 4 billion events
        let event = self.store.get_key_event(&pre, sn as u32 - 1)?;
        let pserder = Serder::new_with_raw(event.as_bytes())?;
        if pserder.said()? != serder.ked()["p"].to_string()? {
            return err!(Error::Verification);
        }

        if deep.unwrap_or(false) {
            let (_, message_list) = MessageList::from_stream_bytes(event[pserder.raw().len()..].as_bytes())?;
            let group = message_list.messages[0].cesr_group()?;

            match group {
                CesrGroup::AttachedMaterialQuadletsVariant { value } => {
                    self.verify_key_event(&pserder, value, deep)?;
                },
                _ => return err!(Error::Decoding)
            }
        }
    }

    if existing {
        let event = self.store.get_key_event(&pre, sn as u32)?;
        let eserder = Serder::new_with_raw(event.as_bytes())?;

        // this seems very bad, it means something is in the database that shouldn't be there. how did it get there?
        if said != eserder.said()? {
            return err!(Error::Programmer);
        }
    }

    println!("succesfully verified key event [{pre}, {said}]");

    Ok(existing)
}

fn verify_transaction_event(&self, serder: &Serder, seal_source_couples: &SealSourceCouples, deep: Option<bool>) -> Result<bool> {
    let pre = serder.pre()?;
    let ked = serder.ked();

    // see if code is supported
    let prefixer = Prefixer::new_with_qb64(&pre)?;

    let sn = serder.sn()?;
    let ilk = ked["t"].to_string()?;
    let said = serder.said()?;
    let ri = Self::extract_registry_from_serder(&ilk, &serder)?;

    let mut existing = false;

    if seal_source_couples.value.len() != 1 {
        return err!(Error::Decoding);
    }
    let source_saider = &seal_source_couples.value()[0].saider;
    let source_seqner = &seal_source_couples.value()[0].seqner;

    let labels = match ilk.as_str() {
        Ilkage::vcp => &Self::VCP_LABELS,
        Ilkage::iss => &Self::ISS_LABELS,
        Ilkage::rev => &Self::REV_LABELS,
        _ => return err!(Error::Decoding),
    };
    if !Self::verify_ked_labels(&ked, labels)? {
        return err!(Error::Validation);
    }

    let inceptive = &ilk == Ilkage::vcp || &ilk == Ilkage::iss;
    let transaction_event_count = self.store.count_transaction_events(&pre)?;

    let apre = match ilk.as_str() {
        Ilkage::vcp => {
            if !prefixer.verify(&ked, Some(true))? {
                return err!(Error::Verification);
            }

            ked["ii"].to_string()?
        },
        Ilkage::iss | Ilkage::rev => {
            if !serder.saider().verify(&ked, Some(false), Some(true), None, None, None)? {
                return err!(Error::Verification);
            }

            let rievent = self.store.get_transaction_event(&ri, 0)?;
            let riserder = Serder::new_with_raw(rievent.as_bytes())?;

            riserder.ked()["ii"].to_string()?
        },
        _ => return err!(Error::Decoding)
    };

    if !self.verify_anchor(&apre, source_seqner, source_saider, serder, deep)? {
        return err!(Error::Verification);
    }

    if inceptive {
        if sn != 0 {
            // must be 0
            return err!(Error::Decoding);
        }

        if transaction_event_count != 0 {
            existing = true;
        }
    } else {
        if sn < 1 {
            return err!(Error::Validation);
        }

        let sno = transaction_event_count as u128;

        if sn > sno {
            // escrow here
            return err!(Error::OutOfOrder);
        }

        if sn != sno {
            existing = true;
        }

        // this sn implementation will become a problem at around 4 billion events
        let event = self.store.get_transaction_event(&pre, sn as u32 - 1)?;
        let pserder = Serder::new_with_raw(event.as_bytes())?;

        if deep.unwrap_or(false) {
            let (_, message_list) = MessageList::from_stream_bytes(event[pserder.raw().len()..].as_bytes())?;
            let group = message_list.messages[0].cesr_group()?;

            match group {
                CesrGroup::SealSourceCouplesVariant { value } => {
                    self.verify_transaction_event(&pserder, value, deep)?;
                },
                _ => return err!(Error::Decoding)
            }
        }

        if pserder.said()? != serder.ked()["p"].to_string()? {
            return err!(Error::Verification);
        }
    }

    if existing {
        let event = self.store.get_transaction_event(&pre, sn as u32)?;
        let eserder = Serder::new_with_raw(event.as_bytes())?;

        // this seems very bad, it means something is in the database that shouldn't be there. how did it get there?
        if said != eserder.said()? {
            return err!(Error::Programmer);
        }
    }

    println!("succesfully verified transaction event [{pre}, {said}]");

    Ok(existing)
}

pub fn ingest_messages(&mut self, messages: &str, deep: Option<bool>) -> Result<()> {
    let (_, message_list) = MessageList::from_stream_bytes(messages.as_bytes())?;
    let mut messages = message_list.messages.iter();

    loop {
        let sadder = messages.next();
        if let Some(sadder) = sadder {
            let payload = sadder.payload()?;
            let raw_string = payload.value.to_string();
            let raw_message = raw_string.as_bytes();
            let result = cesride::common::sniff(raw_message)?;

            if result.ident == Identage::KERI {
                let serder = Serder::new_with_raw(raw_message)?;

                let message = messages.next();
                if let Some(message) = message {
                    let group = message.cesr_group()?;
                    match serder.ked()["t"].to_string()?.as_str() {
                        Ilkage::icp | Ilkage::rot | Ilkage::ixn => {                
                            match group {
                                CesrGroup::AttachedMaterialQuadletsVariant { value } => {
                                    let existing = self.verify_key_event(&serder, value, deep)?;
                                    if !existing {
                                        let event = String::from_utf8(serder.raw())? + &group.qb64()?;
                                        self.store.insert_key_event(&serder.pre()?, &event)?;
                                    }
                                },
                                _ => return err!(Error::Decoding), // we only accept pipelined input at present
                            }
                        },
                        Ilkage::vcp | Ilkage::iss | Ilkage::rev => {
                            match group {
                                CesrGroup::SealSourceCouplesVariant { value } => {
                                    let existing = self.verify_transaction_event(&serder, value, deep)?;
                                    if !existing {
                                        let event = String::from_utf8(serder.raw())? + &group.qb64()?;
                                        self.store.insert_transaction_event(&serder.pre()?, &event)?;
                                    }
                                },
                                _ => return err!(Error::Decoding),
                            }
                        }
                        _ => return err!(Error::Decoding),
                    }
                } else {
                    return err!(Error::Decoding);
                }
            } else if result.ident == Identage::ACDC {
                let creder = Creder::new_with_raw(raw_message)?;

                let message = messages.next();
                if let Some(message) = message {
                    let group = message.cesr_group()?;
                    match group {
                        CesrGroup::AttachedMaterialQuadletsVariant { value } => {
                            let existing = self.verify_acdc(&creder, value, deep)?;
                            if !existing {
                                let acdc = String::from_utf8(creder.raw())? + &group.qb64()?;
                                self.store.insert_acdc(&creder.said()?, &acdc)?;
                            }
                        },
                        _ => return err!(Error::Decoding), // we only accept pipelined input at present
                    };
                } else {
                    return err!(Error::Decoding);
                }
            } else {
                return err!(Error::Decoding)
            }
        } else {
            break;
        }
    }

    self.encrypt_and_cache_storage()
}

```

Community

Parside work currently resides alongside the cesride work.

Bi-weekly Meeting

Important Reference Material