Unified.to
All articles

How to Build a RAG Pipeline for Live SaaS Data


June 10, 2026

Most RAG tutorials use Google Drive or Confluence as the data source. The architecture they describe — fetch files, chunk them, embed them, store them — works for relatively static document corpora. It breaks down when the data source is a CRM, an ATS, a ticketing platform, or an accounting system.

The difference is not incidental. Live transactional SaaS data changes continuously, carries per-record permission constraints that evolve over time, and includes fields — deal stages, ticket statuses, invoice balances — where a stale embedding produces an agent that acts on state that no longer exists.

This guide covers the architecture for a RAG pipeline built on live SaaS data: event-driven ingestion, selective re-embedding, permission handling by category, and the hybrid pattern that combines indexed retrieval with real-time API reads for transactional fields.

It uses Unified as the ingestion and change detection layer. Chunking, embedding, vector database choice, and retrieval logic are your responsibility — the boundary is explicit throughout.

For the foundational concepts behind retrieval timing decisions, see Index-Time RAG vs Real-Time RAG. For a detailed treatment of keeping an index current after initial load, see Keeping Your RAG Index in Sync with Live SaaS Data.

Why live SaaS data requires a different ingestion architecture

Static document sources — Google Drive, Confluence, Notion — change slowly. A nightly rebuild or a periodic sync triggered by a file modification timestamp is sufficient for most use cases. The permission story is simpler too: document-level ACLs captured at ingestion time stay valid for long periods.

Live transactional sources behave differently across every dimension:

Change frequency. A CRM deal can move through multiple pipeline stages in a single day. A support ticket may be updated, reassigned, escalated, and closed within hours. An ATS application status changes every time a candidate advances or is rejected. Nightly rebuilds produce indexes that are wrong for most of the day.

Permission volatility. CRM records change ownership when deals are reassigned. Ticket access changes when customer accounts are transferred. Candidate records become restricted when hiring decisions are made. Permission changes in static document stores are infrequent; in operational SaaS data they are part of normal business processes.

Object identity. A Google Drive file has a stable ID and a modification timestamp — straightforward to track. A Salesforce deal has an ID, but it also has relationships: associated contacts, linked activities, attached notes. Changes to any of those may require re-evaluating what the indexed representation of that deal should contain.

Retrieval correctness. For document Q&A, a slightly stale answer is a minor inconvenience. For an agent that decides whether to escalate a ticket, update a pipeline stage, or flag an invoice for review, a stale answer is an incorrect action. The correctness bar is higher because the consumer acts on the data.

The architecture that follows addresses each of these.

The metadata model

Every chunk stored in your vector index needs a set of stable attributes that enable targeted updates, tenant isolation, and permission filtering. For live SaaS data, the required fields are:

type ChunkMetadata = {
  connection_id: string;   // Unified connection ID — your tenant boundary
  object_type: string;     // e.g. 'crm_deal', 'ats_candidate', 'ticketing_ticket'
  object_id: string;       // Source record ID from the SaaS platform
  chunk_index: number;     // Position of this chunk within the object
  updated_at: string;      // ISO-8601 timestamp of the source record's last update
  is_latest: boolean;      // true for current chunks; false when superseded by an update
};

connection_id is the tenant boundary. Every retrieval query must filter by connection_id to prevent cross-customer data exposure. This is not optional.

object_id + chunk_index together form a deterministic chunk ID. When a source record updates, you target all chunks where object_id matches the updated record's ID, mark them is_latest: false, and upsert the new chunks with is_latest: true. Retrieval filters to is_latest: true only.

is_latest is a field you maintain in your vector store — Unified does not return it. It is the mechanism that prevents stale and current versions of the same record from both surfacing in retrieval results.

Step 1: Subscribe to webhook events with backfill

The first step is creating webhook subscriptions for each object type you want to index. Setting include_all: true triggers backfill — Unified delivers all existing records to your endpoint before transitioning to incremental change events on the same subscription.

import { UnifiedTo } from '@unified-api/typescript-sdk';

const sdk = new UnifiedTo({
  security: { jwt: process.env.UNIFIED_API_KEY! },
});

async function subscribeToObjectType(
  connectionId: string,
  objectType: string,
  hookUrl: string
) {
  const webhook = await sdk.unified.createUnifiedWebhook({
    include_all: true,           // triggers backfill of existing records
    connection_id: connectionId,
    hook_url: hookUrl,
    object_type: objectType,
    event: 'updated',            // 'updated' fires on both creates and updates
    interval: 1,                 // minutes between checks (paid accounts); 60 for free
  });

  return webhook;
}

// Subscribe for the object types relevant to your RAG pipeline
const CONNECTION_ID = process.env.UNIFIED_CONNECTION_ID!;
const WEBHOOK_BASE_URL = process.env.WEBHOOK_BASE_URL!;

await subscribeToObjectType(CONNECTION_ID, 'crm_deal', `${WEBHOOK_BASE_URL}/ingest`);
await subscribeToObjectType(CONNECTION_ID, 'crm_contact', `${WEBHOOK_BASE_URL}/ingest`);
await subscribeToObjectType(CONNECTION_ID, 'ats_candidate', `${WEBHOOK_BASE_URL}/ingest`);
await subscribeToObjectType(CONNECTION_ID, 'ticketing_ticket', `${WEBHOOK_BASE_URL}/ingest`);
await subscribeToObjectType(CONNECTION_ID, 'accounting_invoice', `${WEBHOOK_BASE_URL}/ingest`);

Using event: 'updated' rather than event: 'created' covers both new and updated records in a single subscription. The interval field sets how frequently Unified polls source APIs that don't support native webhooks — for integrations with native webhook support, events arrive immediately regardless of the interval setting.

During backfill, your endpoint receives payloads tagged type: 'INITIAL-PARTIAL' for each page of existing records, followed by type: 'INITIAL-COMPLETE' when backfill finishes. After that, the same subscription delivers ongoing changes tagged type: 'NATIVE' or type: 'VIRTUAL' depending on whether the source integration supports native webhooks.

Step 2: Handle incoming webhook payloads

Your webhook endpoint receives the same payload structure for backfill and incremental updates. Process them identically — no separate backfill logic to maintain.

import express from 'express';
import { createHmac, timingSafeEqual } from 'crypto';

const app = express();
app.use(express.json());

app.post('/ingest', async (req, res) => {
  // Validate signature before processing
  if (!validateWebhookSignature(req.body, process.env.UNIFIED_WORKSPACE_SECRET!)) {
    return res.status(401).json({ error: 'Invalid signature' });
  }

  const { data, webhook, type } = req.body;

  // data is an array of objects — process each one
  for (const record of data) {
    await processRecord(record, webhook.connection_id, webhook.object_type);
  }

  // Respond promptly — Unified retries on non-200 responses
  res.status(200).json({ received: true });
});

function validateWebhookSignature(
  payload: { data: any[]; nonce: string; sig256: string },
  secret: string
): boolean {
  const serializedData = JSON.stringify(payload.data);
  const computedSig = Buffer.from(
    createHmac('sha256', secret)
      .update(serializedData)
      .update(payload.nonce)
      .digest('base64'),
    'utf8'
  );
  const providedSig = Buffer.from(payload.sig256, 'utf8');
  return (
    computedSig.length === providedSig.length &&
    timingSafeEqual(computedSig, providedSig)
  );
}

Step 3: Fetch the full record and build chunk metadata

Webhook payloads contain the object data directly — but for large objects (long ticket descriptions, candidate resumes, detailed CRM notes), you may want to fetch the full record to ensure you have complete content before chunking.

async function processRecord(
  record: any,
  connectionId: string,
  objectType: string
): Promise<void> {
  const objectId = record.id;
  const updatedAt = record.updated_at;

  // Fetch the full record from the source API to ensure complete content before chunking.
  // Webhook payloads may truncate large text fields on some integrations.
  let fullRecord = record;
  if (objectType === 'crm_deal') {
    const result = await sdk.crm.getCrmDeal({ connectionId, id: objectId });
    if (result) fullRecord = result;
  } else if (objectType === 'ats_candidate') {
    const result = await sdk.ats.getAtsCandidate({ connectionId, id: objectId });
    if (result) fullRecord = result;
  } else if (objectType === 'ats_application') {
    const result = await sdk.ats.getAtsApplication({ connectionId, id: objectId });
    if (result) fullRecord = result;
  }

  // Extract the text content to index
  const textContent = extractTextContent(fullRecord, objectType);
  if (!textContent) return; // nothing to index for this record

  // Chunk the content
  const chunks = chunkText(textContent);

  // Mark existing chunks for this object as stale
  await markChunksStale(connectionId, objectId);

  // Upsert new chunks with current metadata
  for (let i = 0; i < chunks.length; i++) {
    const metadata: ChunkMetadata = {
      connection_id: connectionId,
      object_type: objectType,
      object_id: objectId,
      chunk_index: i,
      updated_at: updatedAt,
      is_latest: true,
    };

    const embedding = await generateEmbedding(chunks[i]);
    await upsertToVectorStore(chunks[i], embedding, metadata);
  }
}

function extractTextContent(record: any, objectType: string): string | null {
  // Extract the relevant text fields per object type
  switch (objectType) {
    case 'crm_deal':
      // CrmDeal text fields: name and stages are broadly supported across integrations.
      // source is readable on Salesforce/HubSpot/Copper only.
      // tags, lost_reason, won_reason have minimal cross-integration support — omitted.
      // metadata carries custom fields where available (Salesforce, HubSpot, Pipedrive, Zoho).
      return [
        record.name,
        record.stages?.map((s: any) => s.name).filter(Boolean).join(', '),
        record.source,
        record.metadata?.map((m: any) => [m.slug, m.value].filter(Boolean).join(': ')).filter(Boolean).join('; '),
      ].filter(Boolean).join('\n');
    case 'ats_candidate':
      // AtsCandidate text fields: name, title, company_name, skills, bio from experiences
      return [
        record.name,
        record.title,
        record.company_name,
        record.skills?.join(', '),
        record.experiences?.map((e: any) => `${e.title} at ${e.company_name}`).join(', '),
      ].filter(Boolean).join('\n');
    case 'ats_application':
      // AtsApplication text fields: status, source, rejected_reason
      return [
        record.status,
        record.source,
        record.rejected_reason,
      ].filter(Boolean).join('\n');
    case 'ticketing_ticket':
      // TicketingTicket text fields: subject, description, tags, source, priority
      return [
        record.subject,
        record.description,
        record.tags?.join(', '),
        record.source,
        record.priority,
      ].filter(Boolean).join('\n');
    case 'accounting_invoice':
      // AccountingInvoice text fields: invoice_number and notes are broadly supported
      // (28 and 25 integrations respectively, including QuickBooks, Xero, NetSuite, Sage Intacct).
      // reference (6 integrations) and refund_reason (5 integrations) omitted — unreliable cross-integration.
      return [
        record.invoice_number,
        record.notes,
      ].filter(Boolean).join('\n');
    default:
      return record.raw ? JSON.stringify(record.raw) : null;
  }
}

Step 4: Selective re-embedding on update

When a record updates, mark its existing chunks stale before inserting new ones. This prevents retrieval from returning both the old and new versions of the same record.

async function markChunksStale(
  connectionId: string,
  objectId: string
): Promise<void> {
  // In your vector store, update all chunks where:
  //   connection_id = connectionId AND object_id = objectId AND is_latest = true
  // Set is_latest = false
  //
  // Implementation depends on your vector store:
  // Pinecone: use metadata filter update
  // pgvector: UPDATE chunks SET is_latest = false WHERE connection_id = $1 AND object_id = $2 AND is_latest = true
  // Weaviate: batch patch with where filter
}

// At retrieval time, always filter to is_latest = true
async function retrieveRelevantChunks(
  query: string,
  connectionId: string,
  topK: number = 5
): Promise<any[]> {
  const queryEmbedding = await generateEmbedding(query);

  // Your vector store query must include both filters:
  // 1. connection_id = connectionId  (tenant isolation — required)
  // 2. is_latest = true              (current versions only)
  const results = await vectorStore.query({
    vector: queryEmbedding,
    topK,
    filter: {
      connection_id: { $eq: connectionId },
      is_latest: { $eq: true },
    },
  });

  return results;
}

Step 5: The hybrid pattern for transactional fields

Not all fields from operational SaaS data should be indexed. Some fields — deal stage, ticket status, invoice balance, candidate pipeline position — change too frequently for an index to stay reliably current, and an agent acting on a stale version of these fields can take an incorrect action.

The production pattern for these fields is: use indexed retrieval to find which record is relevant, then fetch current state from the source API for transactional fields before passing context to the model.

async function buildAgentContext(
  query: string,
  connectionId: string
): Promise<string> {
  // Step 1: Find relevant records using indexed semantic search
  const relevantChunks = await retrieveRelevantChunks(query, connectionId);

  // Step 2: For transactional fields, fetch current state from source API
  // rather than relying on what was indexed
  const enrichedContext = await Promise.all(
    relevantChunks.map(async (chunk) => {
      if (chunk.metadata.object_type === 'crm_deal') {
        // Fetch current deal state for transactional fields — stages, amount, probability, user_id
        // Note: top-level stage/stage_id are deprecated; use the stages array
        const currentDeal = await sdk.crm.getCrmDeal({
          connectionId,
          id: chunk.metadata.object_id,
          fields: 'stages,pipelines,amount,probability,user_id,closing_at,closed_at',
        });
        const currentStage = currentDeal?.stages?.[0]?.name ?? 'unknown';
        return `${chunk.text}\n[Current stage: ${currentStage}, Owner: ${currentDeal?.user_id}, Amount: ${currentDeal?.amount} ${currentDeal?.currency}]`;
      }

      if (chunk.metadata.object_type === 'ats_application') {
        // Fetch current application status — the primary transactional field for ATS
        const currentApp = await sdk.ats.getAtsApplication({
          connectionId,
          id: chunk.metadata.object_id,
          fields: 'status,original_status,hired_at,rejected_at,rejected_reason',
        });
        return `${chunk.text}\n[Current status: ${currentApp?.status}, Original: ${currentApp?.original_status}]`;
      }

      if (chunk.metadata.object_type === 'ticketing_ticket') {
        // Fetch current ticket status, priority, and assignee
        const currentTicket = await sdk.ticketing.getTicketingTicket({
          connectionId,
          id: chunk.metadata.object_id,
          fields: 'status,priority,user_id,closed_at,category_id',
        });
        return `${chunk.text}\n[Current status: ${currentTicket?.status}, Priority: ${currentTicket?.priority}, Assignee: ${currentTicket?.user_id}]`;
      }

      if (chunk.metadata.object_type === 'accounting_invoice') {
        // Fetch current invoice status and balance — highly transactional fields
        const currentInvoice = await sdk.accounting.getAccountingInvoice({
          connectionId,
          id: chunk.metadata.object_id,
          fields: 'status,balance_amount,paid_amount,total_amount,currency,due_at,paid_at',
        });
        return `${chunk.text}\n[Status: ${currentInvoice?.status}, Balance: ${currentInvoice?.balance_amount} ${currentInvoice?.currency}, Due: ${currentInvoice?.due_at}]`;
      }

      // For non-transactional fields (text content, historical notes), indexed
      // version is sufficient
      return chunk.text;
    })
  );

  return enrichedContext.join('\n\n---\n\n');
}

The fields that warrant live reads are those where correctness at execution time matters: status fields, owner/assignee fields, balance fields, and any field the agent's decision logic branches on. Text content — descriptions, notes, transcripts — is fine to serve from the index.

Permissions by category

Permission enforcement in RAG pipelines is handled differently depending on what the source API returns.

File storage (storage_file): Unified returns explicit permissions metadata on file objects, including owner and access control information. Capture these fields at ingestion time and store them as chunk metadata for filtering at retrieval.

CRM, ATS, ticketing, accounting: These categories return ownership fields (owner_id, user_id, assignee_id) but not user/group-level ACLs. Unified does not return permission structures for these object types — row-level access enforcement belongs in your application layer.

The practical pattern for CRM, ATS, and ticketing:

// At ingestion: capture ownership fields in chunk metadata
const metadata: ChunkMetadata & { owner_id?: string } = {
  connection_id: connectionId,
  object_type: objectType,
  object_id: objectId,
  chunk_index: i,
  updated_at: updatedAt,
  is_latest: true,
  // Capture ownership for row-level filtering
  owner_id: record.user_id ?? record.owner_id ?? record.assignee_id,
};

// At retrieval: filter by ownership before passing context to the model
async function retrieveForUser(
  query: string,
  connectionId: string,
  userId: string
): Promise<any[]> {
  const queryEmbedding = await generateEmbedding(query);

  return vectorStore.query({
    vector: queryEmbedding,
    topK: 5,
    filter: {
      connection_id: { $eq: connectionId },
      is_latest: { $eq: true },
      // Apply ownership filter in application layer
      // Note: for complex role hierarchies (Salesforce territory rules,
      // ATS hiring team access), implement your own permission service
      // and pre-filter document IDs before querying
      owner_id: { $eq: userId },
    },
  });
}

For applications where access control is more complex than owner matching — shared deals visible to a sales team, tickets accessible to a support tier, candidate profiles shared across a hiring committee — implement a separate permission service that returns the set of authorized record IDs for a given user, then use that ID list as a pre-filter before the vector search.

What Unified handles and what you own

The boundary is explicit:

Unified handles:

  • Authorized connections to 460+ SaaS integrations across CRM, ATS, ticketing, accounting, file storage, and additional categories
  • Backfill of existing records when a webhook subscription is created (include_all: true)
  • Change detection — native webhooks where the source API supports them, managed polling for those that don't
  • Normalized object schemas across integrations so your chunking and metadata logic doesn't need per-integration branching
  • Checkpoint and resume on delivery failures — Unified tracks state so failed runs don't restart from zero

You own:

  • Chunking strategy and chunk size decisions
  • Embedding model selection and re-embedding when the model changes
  • Vector store schema, indexing configuration, and query logic
  • The is_latest flag lifecycle in your vector store
  • Row-level permission enforcement for CRM, ATS, and ticketing categories
  • The hybrid retrieval logic for transactional fields

What to index vs what to read live

A useful heuristic for each field in a live SaaS object:

Index it if: it is textual, changes infrequently relative to query rate, and the agent uses it to find relevant records rather than to make a decision based on current state. Examples: deal description, ticket body, candidate bio, invoice memo.

Read it live if: it is a status, balance, assignee, or stage field; the agent's decision logic branches on it; or an incorrect value would cause the agent to take a wrong action. Examples: deal stage, ticket status, invoice balance, candidate pipeline position.

When in doubt, read live. The latency cost of one additional API call per query is lower than the correctness cost of an agent that acts on state that changed after the last index update.

Explore Unified's webhook documentation

Read the RAG pipeline use case guide

Talk to us about building AI features on live SaaS data

All articles