Document Processing Pipeline

A complete example demonstrating actors, AI integration, supervision, and parallel processing.

document-pipeline.sx
// Types
struct Document {
    id: String,
    content: String,
    metadata: Map<String, String>
}

struct ProcessedDoc {
    id: String,
    summary: String,
    entities: List<Entity>,
    sentiment: Sentiment,
    embedding: Vector<f64, 1536>
}

struct Entity {
    text: String,
    entity_type: String,
    confidence: f64
}

enum Sentiment { Positive, Neutral, Negative }

// Document Processor Actor
actor DocumentProcessor {
    var processed_count: i64 = 0
    var current_doc: Option<Document> = None

    receive Process(doc: Document) -> ProcessedDoc {
        // Track current document for recovery
        current_doc = Some(doc)
        checkpoint()

        // Parallel AI operations
        let (summary, entities, sentiment, embedding) = await parallel(
            ai::complete("Summarize in 2-3 sentences: {doc.content}"),
            ai::extract<List<Entity>>("Extract named entities: {doc.content}"),
            ai::classify<Sentiment>(doc.content),
            ai::embed(doc.content)
        )

        processed_count += 1
        current_doc = None

        ProcessedDoc {
            id: doc.id,
            summary,
            entities,
            sentiment,
            embedding
        }
    }

    receive GetStats -> i64 {
        processed_count
    }

    // Resume processing after crash
    on_resume() {
        if let Some(doc) = current_doc {
            print("Resuming processing of {doc.id}")
        }
    }
}

// Storage Actor
actor DocumentStorage {
    var documents: Map<String, ProcessedDoc> = {}
    var embeddings: Map<String, Vector<f64, 1536>> = {}

    receive Store(doc: ProcessedDoc) {
        documents.insert(doc.id, doc)
        embeddings.insert(doc.id, doc.embedding)
        checkpoint()
    }

    receive Search(query: String, limit: i64) -> List<ProcessedDoc> {
        let query_embedding = ai::embed(query)

        embeddings
            .entries()
            .map((id, emb) => (id, ai::similarity(query_embedding, emb)))
            .sort_by(|(_, score)| score, descending)
            .take(limit)
            .map((id, _)| documents[id])
    }

    receive Get(id: String) -> Option<ProcessedDoc> {
        documents.get(id)
    }
}

// Supervision tree
supervisor PipelineSupervisor {
    strategy: OneForOne,
    max_restarts: 5,
    within: Duration::minutes(1),

    children: [
        child(DocumentProcessor, restart: Always),
        child(DocumentStorage, restart: Always)
    ]
}

// Main entry point
fn main() {
    let supervisor = spawn PipelineSupervisor
    let processor = supervisor.get_child(DocumentProcessor)
    let storage = supervisor.get_child(DocumentStorage)

    // Process documents
    let doc = Document {
        id: "doc-001",
        content: "Simplex is a new programming language...",
        metadata: {}
    }

    let processed = ask(processor, Process(doc))
    send(storage, Store(processed))

    // Search documents
    let results = ask(storage, Search("programming languages", 5))
    print("Found {results.len()} matching documents")
}

AI Chatbot with Memory

A conversational AI chatbot that maintains context across messages.

chatbot.sx
struct Message {
    role: Role,
    content: String
}

enum Role { User, Assistant, System }

actor Chatbot {
    var history: List<Message> = []
    var system_prompt: String

    fn new(system: String) -> Self {
        Chatbot {
            history: [Message { role: Role::System, content: system }],
            system_prompt: system
        }
    }

    receive Chat(user_message: String) -> String {
        // Add user message to history
        history.push(Message {
            role: Role::User,
            content: user_message
        })

        // Build prompt from history
        let prompt = build_prompt(history)

        // Get AI response
        let response = await ai::complete(prompt)

        // Add assistant response to history
        history.push(Message {
            role: Role::Assistant,
            content: response
        })

        // Persist conversation state
        checkpoint()

        response
    }

    receive Reset {
        history = [Message {
            role: Role::System,
            content: system_prompt
        }]
        checkpoint()
    }

    receive GetHistory -> List<Message> {
        history
    }
}

fn build_prompt(messages: List<Message>) -> String {
    messages
        .map(m => "{m.role}: {m.content}")
        .join("\n")
}

RAG (Retrieval-Augmented Generation)

A knowledge-base chatbot that retrieves relevant context before generating responses.

rag.sx
struct KnowledgeChunk {
    id: String,
    content: String,
    embedding: Vector<f64, 1536>,
    source: String
}

actor KnowledgeBase {
    var chunks: List<KnowledgeChunk> = []

    receive Ingest(content: String, source: String) {
        // Split into chunks
        let text_chunks = split_into_chunks(content, 500)

        // Embed all chunks in parallel
        let embeddings = ai::embed_batch(text_chunks)

        // Store chunks
        for (text, embedding) in text_chunks.zip(embeddings) {
            chunks.push(KnowledgeChunk {
                id: generate_id(),
                content: text,
                embedding,
                source
            })
        }
        checkpoint()
    }

    receive Search(query: String, k: i64) -> List<KnowledgeChunk> {
        let query_embedding = ai::embed(query)

        chunks
            .map(c => (c, ai::similarity(query_embedding, c.embedding)))
            .sort_by(|(_, score)| score, descending)
            .take(k)
            .map((chunk, _)| chunk)
    }
}

actor RAGChatbot {
    var kb: ActorRef<KnowledgeBase>

    fn new(knowledge_base: ActorRef<KnowledgeBase>) -> Self {
        RAGChatbot { kb: knowledge_base }
    }

    receive Ask(question: String) -> String {
        // Retrieve relevant context
        let relevant_chunks = ask(kb, Search(question, 3))

        // Build context string
        let context = relevant_chunks
            .map(c => "Source: {c.source}\n{c.content}")
            .join("\n\n")

        // Generate answer with context
        let prompt = "
Based on the following context, answer the question.

Context:
{context}

Question: {question}

Answer:"

        await ai::complete(prompt)
    }
}

Cognitive Hive Example

A complete CHAI hive for document analysis with multiple specialists.

document-hive.sx
// Specialist: Summarization
specialist Isto {  // "Knowledge" in Sindarin
    model: "summarization-7b",
    domain: "text summarization",
    memory: 8.GB,
    temperature: 0.3,

    receive Summarize(text: String) -> String {
        infer("Summarize the following text concisely:\n\n{text}")
    }
}

// Specialist: Entity Extraction
specialist Curu {  // "Skill/Craft" in Sindarin
    model: "ner-fine-tuned-7b",
    domain: "named entity recognition",
    memory: 8.GB,
    temperature: 0.1,

    receive Extract(text: String) -> List<Entity> {
        let raw = infer("Extract all named entities (people, organizations, locations, dates) from:\n\n{text}")
        parse_entities(raw)
    }
}

// Specialist: Sentiment Analysis
specialist Silma {  // "Crystal/Clarity" in Sindarin
    model: "sentiment-7b",
    domain: "sentiment analysis",
    memory: 8.GB,
    temperature: 0.1,

    receive Analyze(text: String) -> SentimentResult {
        infer_typed<SentimentResult>(
            "Analyze the sentiment of this text. Return positive, negative, or neutral with a confidence score:\n\n{text}"
        )
    }
}

// Specialist: Topic Classification
specialist Penna {  // "Teller/Writer" in Sindarin
    model: "classifier-7b",
    domain: "topic classification",
    memory: 8.GB,
    temperature: 0.2,

    receive Classify(text: String) -> List<Topic> {
        infer_typed<List<Topic>>(
            "Classify this text into relevant topics:\n\n{text}"
        )
    }
}

// The Hive
hive DocumentAnalyzer {
    specialists: [Isto, Curu, Silma, Penna],

    router: SemanticRouter(
        embedding_model: "all-minilm-l6-v2"
    ),

    strategy: OneForOne,
    max_restarts: 3,

    memory: SharedVectorStore(dimension: 384),

    // Analyze a document using all specialists
    receive AnalyzeDocument(doc: String) -> DocumentAnalysis {
        let (summary, entities, sentiment, topics) = await parallel(
            ask(Isto, Summarize(doc)),
            ask(Curu, Extract(doc)),
            ask(Silma, Analyze(doc)),
            ask(Penna, Classify(doc))
        )

        DocumentAnalysis { summary, entities, sentiment, topics }
    }
}

Simple CRM

A basic CRM system demonstrating data management with AI-enhanced features.

crm.sx
struct Contact {
    id: String,
    name: String,
    email: String,
    company: Option<String>,
    notes: List<Note>,
    score: f64  // AI-generated lead score
}

struct Note {
    timestamp: DateTime,
    content: String,
    sentiment: Sentiment
}

actor ContactManager {
    var contacts: Map<String, Contact> = {}

    receive AddContact(name: String, email: String) -> Contact {
        let contact = Contact {
            id: generate_id(),
            name,
            email,
            company: None,
            notes: [],
            score: 0.5
        }
        contacts.insert(contact.id, contact)
        checkpoint()
        contact
    }

    receive AddNote(contact_id: String, content: String) -> Result<Note, Error> {
        let contact = contacts.get_mut(contact_id)?

        // AI-analyze the note
        let sentiment = await ai::classify<Sentiment>(content)

        let note = Note {
            timestamp: DateTime::now(),
            content,
            sentiment
        }

        contact.notes.push(note)

        // Update lead score based on interaction history
        contact.score = calculate_lead_score(contact)

        checkpoint()
        Ok(note)
    }

    receive GetSuggestedResponse(contact_id: String) -> String {
        let contact = contacts.get(contact_id)?

        let history = contact.notes
            .map(n => n.content)
            .join("\n")

        await ai::complete("
Based on this interaction history with {contact.name}:
{history}

Suggest a professional follow-up message:")
    }

    receive GetTopLeads(limit: i64) -> List<Contact> {
        contacts
            .values()
            .sort_by(c => c.score, descending)
            .take(limit)
    }
}

fn calculate_lead_score(contact: &Contact) -> f64 {
    let interaction_count = contact.notes.len() as f64
    let positive_ratio = contact.notes
        .filter(n => n.sentiment == Sentiment::Positive)
        .len() as f64 / interaction_count.max(1.0)

    (interaction_count.min(10.0) / 10.0) * 0.4 + positive_ratio * 0.6
}

Next Steps