Document Processing Pipeline
A complete example demonstrating actors, AI integration, supervision, and parallel processing.
// Types
struct Document {
id: String,
content: String,
metadata: Map<String, String>
}
struct ProcessedDoc {
id: String,
summary: String,
entities: List<Entity>,
sentiment: Sentiment,
embedding: Vector<f64, 1536>
}
struct Entity {
text: String,
entity_type: String,
confidence: f64
}
enum Sentiment { Positive, Neutral, Negative }
// Document Processor Actor
actor DocumentProcessor {
var processed_count: i64 = 0
var current_doc: Option<Document> = None
receive Process(doc: Document) -> ProcessedDoc {
// Track current document for recovery
current_doc = Some(doc)
checkpoint()
// Parallel AI operations
let (summary, entities, sentiment, embedding) = await parallel(
ai::complete("Summarize in 2-3 sentences: {doc.content}"),
ai::extract<List<Entity>>("Extract named entities: {doc.content}"),
ai::classify<Sentiment>(doc.content),
ai::embed(doc.content)
)
processed_count += 1
current_doc = None
ProcessedDoc {
id: doc.id,
summary,
entities,
sentiment,
embedding
}
}
receive GetStats -> i64 {
processed_count
}
// Resume processing after crash
on_resume() {
if let Some(doc) = current_doc {
print("Resuming processing of {doc.id}")
}
}
}
// Storage Actor
actor DocumentStorage {
var documents: Map<String, ProcessedDoc> = {}
var embeddings: Map<String, Vector<f64, 1536>> = {}
receive Store(doc: ProcessedDoc) {
documents.insert(doc.id, doc)
embeddings.insert(doc.id, doc.embedding)
checkpoint()
}
receive Search(query: String, limit: i64) -> List<ProcessedDoc> {
let query_embedding = ai::embed(query)
embeddings
.entries()
.map((id, emb) => (id, ai::similarity(query_embedding, emb)))
.sort_by(|(_, score)| score, descending)
.take(limit)
.map((id, _)| documents[id])
}
receive Get(id: String) -> Option<ProcessedDoc> {
documents.get(id)
}
}
// Supervision tree
supervisor PipelineSupervisor {
strategy: OneForOne,
max_restarts: 5,
within: Duration::minutes(1),
children: [
child(DocumentProcessor, restart: Always),
child(DocumentStorage, restart: Always)
]
}
// Main entry point
fn main() {
let supervisor = spawn PipelineSupervisor
let processor = supervisor.get_child(DocumentProcessor)
let storage = supervisor.get_child(DocumentStorage)
// Process documents
let doc = Document {
id: "doc-001",
content: "Simplex is a new programming language...",
metadata: {}
}
let processed = ask(processor, Process(doc))
send(storage, Store(processed))
// Search documents
let results = ask(storage, Search("programming languages", 5))
print("Found {results.len()} matching documents")
}
AI Chatbot with Memory
A conversational AI chatbot that maintains context across messages.
struct Message {
role: Role,
content: String
}
enum Role { User, Assistant, System }
actor Chatbot {
var history: List<Message> = []
var system_prompt: String
fn new(system: String) -> Self {
Chatbot {
history: [Message { role: Role::System, content: system }],
system_prompt: system
}
}
receive Chat(user_message: String) -> String {
// Add user message to history
history.push(Message {
role: Role::User,
content: user_message
})
// Build prompt from history
let prompt = build_prompt(history)
// Get AI response
let response = await ai::complete(prompt)
// Add assistant response to history
history.push(Message {
role: Role::Assistant,
content: response
})
// Persist conversation state
checkpoint()
response
}
receive Reset {
history = [Message {
role: Role::System,
content: system_prompt
}]
checkpoint()
}
receive GetHistory -> List<Message> {
history
}
}
fn build_prompt(messages: List<Message>) -> String {
messages
.map(m => "{m.role}: {m.content}")
.join("\n")
}
RAG (Retrieval-Augmented Generation)
A knowledge-base chatbot that retrieves relevant context before generating responses.
struct KnowledgeChunk {
id: String,
content: String,
embedding: Vector<f64, 1536>,
source: String
}
actor KnowledgeBase {
var chunks: List<KnowledgeChunk> = []
receive Ingest(content: String, source: String) {
// Split into chunks
let text_chunks = split_into_chunks(content, 500)
// Embed all chunks in parallel
let embeddings = ai::embed_batch(text_chunks)
// Store chunks
for (text, embedding) in text_chunks.zip(embeddings) {
chunks.push(KnowledgeChunk {
id: generate_id(),
content: text,
embedding,
source
})
}
checkpoint()
}
receive Search(query: String, k: i64) -> List<KnowledgeChunk> {
let query_embedding = ai::embed(query)
chunks
.map(c => (c, ai::similarity(query_embedding, c.embedding)))
.sort_by(|(_, score)| score, descending)
.take(k)
.map((chunk, _)| chunk)
}
}
actor RAGChatbot {
var kb: ActorRef<KnowledgeBase>
fn new(knowledge_base: ActorRef<KnowledgeBase>) -> Self {
RAGChatbot { kb: knowledge_base }
}
receive Ask(question: String) -> String {
// Retrieve relevant context
let relevant_chunks = ask(kb, Search(question, 3))
// Build context string
let context = relevant_chunks
.map(c => "Source: {c.source}\n{c.content}")
.join("\n\n")
// Generate answer with context
let prompt = "
Based on the following context, answer the question.
Context:
{context}
Question: {question}
Answer:"
await ai::complete(prompt)
}
}
Cognitive Hive Example
A complete CHAI hive for document analysis with multiple specialists.
// Specialist: Summarization
specialist Isto { // "Knowledge" in Sindarin
model: "summarization-7b",
domain: "text summarization",
memory: 8.GB,
temperature: 0.3,
receive Summarize(text: String) -> String {
infer("Summarize the following text concisely:\n\n{text}")
}
}
// Specialist: Entity Extraction
specialist Curu { // "Skill/Craft" in Sindarin
model: "ner-fine-tuned-7b",
domain: "named entity recognition",
memory: 8.GB,
temperature: 0.1,
receive Extract(text: String) -> List<Entity> {
let raw = infer("Extract all named entities (people, organizations, locations, dates) from:\n\n{text}")
parse_entities(raw)
}
}
// Specialist: Sentiment Analysis
specialist Silma { // "Crystal/Clarity" in Sindarin
model: "sentiment-7b",
domain: "sentiment analysis",
memory: 8.GB,
temperature: 0.1,
receive Analyze(text: String) -> SentimentResult {
infer_typed<SentimentResult>(
"Analyze the sentiment of this text. Return positive, negative, or neutral with a confidence score:\n\n{text}"
)
}
}
// Specialist: Topic Classification
specialist Penna { // "Teller/Writer" in Sindarin
model: "classifier-7b",
domain: "topic classification",
memory: 8.GB,
temperature: 0.2,
receive Classify(text: String) -> List<Topic> {
infer_typed<List<Topic>>(
"Classify this text into relevant topics:\n\n{text}"
)
}
}
// The Hive
hive DocumentAnalyzer {
specialists: [Isto, Curu, Silma, Penna],
router: SemanticRouter(
embedding_model: "all-minilm-l6-v2"
),
strategy: OneForOne,
max_restarts: 3,
memory: SharedVectorStore(dimension: 384),
// Analyze a document using all specialists
receive AnalyzeDocument(doc: String) -> DocumentAnalysis {
let (summary, entities, sentiment, topics) = await parallel(
ask(Isto, Summarize(doc)),
ask(Curu, Extract(doc)),
ask(Silma, Analyze(doc)),
ask(Penna, Classify(doc))
)
DocumentAnalysis { summary, entities, sentiment, topics }
}
}
Simple CRM
A basic CRM system demonstrating data management with AI-enhanced features.
struct Contact {
id: String,
name: String,
email: String,
company: Option<String>,
notes: List<Note>,
score: f64 // AI-generated lead score
}
struct Note {
timestamp: DateTime,
content: String,
sentiment: Sentiment
}
actor ContactManager {
var contacts: Map<String, Contact> = {}
receive AddContact(name: String, email: String) -> Contact {
let contact = Contact {
id: generate_id(),
name,
email,
company: None,
notes: [],
score: 0.5
}
contacts.insert(contact.id, contact)
checkpoint()
contact
}
receive AddNote(contact_id: String, content: String) -> Result<Note, Error> {
let contact = contacts.get_mut(contact_id)?
// AI-analyze the note
let sentiment = await ai::classify<Sentiment>(content)
let note = Note {
timestamp: DateTime::now(),
content,
sentiment
}
contact.notes.push(note)
// Update lead score based on interaction history
contact.score = calculate_lead_score(contact)
checkpoint()
Ok(note)
}
receive GetSuggestedResponse(contact_id: String) -> String {
let contact = contacts.get(contact_id)?
let history = contact.notes
.map(n => n.content)
.join("\n")
await ai::complete("
Based on this interaction history with {contact.name}:
{history}
Suggest a professional follow-up message:")
}
receive GetTopLeads(limit: i64) -> List<Contact> {
contacts
.values()
.sort_by(c => c.score, descending)
.take(limit)
}
}
fn calculate_lead_score(contact: &Contact) -> f64 {
let interaction_count = contact.notes.len() as f64
let positive_ratio = contact.notes
.filter(n => n.sentiment == Sentiment::Positive)
.len() as f64 / interaction_count.max(1.0)
(interaction_count.min(10.0) / 10.0) * 0.4 + positive_ratio * 0.6
}