Back to blogs
AI / Retail
7 min read
Apr 29, 2025

Data Engineering for Multi-System Imports: Standard Format Across POS Systems

How Afto ingests and normalizes product, customer, and transaction data from five different POS systems into a single canonical format using Dagster pipelines, with deduplication and incremental sync at scale.

Data EngineeringDagsterETLPOS SystemsData PipelineNormalizationPostgreSQL
Table of contents

Afto works with SMB retailers who use whatever POS system their accountant recommended years ago. We onboard businesses running Shopify, WooCommerce, Loyverse, Marg ERP, and custom Excel-based systems. Each one exports data in a completely different format. Getting all of them into a single queryable store—without data loss or duplication—was one of the more interesting engineering challenges on the project.

The Core Problem

Each POS system has its own shape for the same conceptual data:

Shopify customer:
{
  "id": "gid://shopify/Customer/123",
  "email": "alice@example.com",
  "first_name": "Alice",
  "last_name": "Sharma",
  "orders_count": 5,
  "total_spent": "2400.00"
}

Loyverse customer:
{
  "customer_id": "c_7e3a",
  "name": "Alice Sharma",
  "phone": "+919876543210",
  "total_visits": 5,
  "total_spent": 2400
}

WooCommerce customer:
{
  "id": 456,
  "billing": {
    "first_name": "Alice",
    "last_name": "Sharma",
    "email": "alice@example.com",
    "phone": "9876543210"
  },
  "orders": [...]
}

Three systems, three ways to represent the same person. Our job was to define a canonical schema and write adapters for each system.

The Canonical Data Model

We designed the canonical model to be the union of what matters across all systems, with nullable fields for system-specific data:

type CanonicalCustomer = {
  afto_id: string;         // our internal UUID
  source_system: string;   // "shopify" | "loyverse" | "woocommerce" | ...
  source_id: string;       // original ID in the source system
  name: string;
  email?: string;
  phone?: string;
  total_purchases: number;
  total_spend_inr: number;
  last_purchase_date?: Date;
  tags: string[];
  raw: Record<string, unknown>; // original payload for debugging
};
 
type CanonicalProduct = {
  afto_id: string;
  source_system: string;
  source_id: string;
  name: string;
  sku?: string;
  price_inr: number;
  category?: string;
  in_stock: boolean;
  quantity_available?: number;
  image_url?: string;
  raw: Record<string, unknown>;
};
 
type CanonicalTransaction = {
  afto_id: string;
  source_system: string;
  source_id: string;
  customer_afto_id?: string;
  items: Array<{
    product_afto_id: string;
    quantity: number;
    unit_price_inr: number;
  }>;
  total_inr: number;
  transacted_at: Date;
  payment_method?: string;
  status: "completed" | "refunded" | "cancelled";
};

The raw field was a conscious decision: it means we can always re-derive canonical fields if we fix a parsing bug, without re-fetching from the source.

POS Adapters

Each POS system gets its own adapter module implementing a common interface:

interface POSAdapter {
  fetchCustomers(since?: Date): AsyncGenerator<CanonicalCustomer>;
  fetchProducts(since?: Date): AsyncGenerator<CanonicalProduct>;
  fetchTransactions(since?: Date): AsyncGenerator<CanonicalTransaction>;
}

Using AsyncGenerator lets us stream large datasets without holding everything in memory. Here's the Shopify adapter for customers:

class ShopifyAdapter implements POSAdapter {
  constructor(private readonly shop: ShopifyConfig) {}
 
  async *fetchCustomers(since?: Date): AsyncGenerator<CanonicalCustomer> {
    let cursor: string | undefined;
 
    do {
      const { customers, pageInfo } = await this.shopify.customer.list({
        limit: 250,
        updated_at_min: since?.toISOString(),
        page_info: cursor,
      });
 
      for (const c of customers) {
        yield {
          afto_id: generateDeterministicId("shopify", c.id),
          source_system: "shopify",
          source_id: String(c.id),
          name: `${c.first_name} ${c.last_name}`.trim(),
          email: c.email || undefined,
          phone: c.phone || undefined,
          total_purchases: c.orders_count,
          total_spend_inr: Math.round(parseFloat(c.total_spent) * 83.5), // USD to INR
          last_purchase_date: c.last_order ? new Date(c.last_order.created_at) : undefined,
          tags: c.tags ? c.tags.split(",").map((t) => t.trim()) : [],
          raw: c,
        };
      }
 
      cursor = pageInfo.hasNextPage ? pageInfo.endCursor : undefined;
    } while (cursor);
  }
}

The deterministic ID generation was key for deduplication. We hash (source_system, source_id) with SHA-256 to get a stable afto_id that's the same on every import for the same entity.

Dagster Pipeline Architecture

We orchestrate all imports with Dagster. Each retailer's import is an independent job with assets for each entity type:

from dagster import asset, AssetIn, Output
import pandas as pd
 
@asset(
    group_name="shopify_sync",
    partitions_def=retailer_partitions,
)
def shopify_customers(context) -> Output[pd.DataFrame]:
    retailer_id = context.partition_key
    config = get_retailer_config(retailer_id)
    adapter = ShopifyAdapter(config)
 
    rows = []
    async for customer in adapter.fetch_customers(since=get_last_sync_time(retailer_id)):
        rows.append(customer)
 
    df = pd.DataFrame(rows)
    context.log.info(f"Fetched {len(df)} customers for retailer {retailer_id}")
 
    return Output(
        value=df,
        metadata={"num_records": len(df), "retailer_id": retailer_id},
    )
 
@asset(
    group_name="shopify_sync",
    ins={"customers": AssetIn("shopify_customers")},
)
def upsert_customers(context, customers: pd.DataFrame) -> None:
    upsert_to_postgres(customers, "canonical_customers")
    update_last_sync_time(context.partition_key, "customers")

We partition by retailer_id, so a failure for one retailer doesn't block others. Dagster's asset graph makes it easy to see exactly which retailer-entity combinations are stale.

Deduplication Strategy

The hardest problem was customers appearing in multiple systems. A retailer might use Shopify for online orders and Loyverse for in-store—the same customer appears in both, with slightly different phone/email formats.

We run a post-import deduplication pass that clusters records by:

  1. Exact email match
  2. Normalized phone number match (strip country code, strip spaces)
  3. Fuzzy name match with Levenshtein distance ≤ 2 AND same phone prefix

When a cluster is found, we merge the records, keeping the most recently updated values for mutable fields (name, contact info) and summing numeric fields (purchases, spend):

async function deduplicateCustomers(retailerId: string) {
  const dupes = await db.query(
    `SELECT array_agg(afto_id ORDER BY updated_at DESC) AS ids
     FROM canonical_customers
     WHERE retailer_id = $1
     GROUP BY COALESCE(email, ''), normalize_phone(phone)
     HAVING COUNT(*) > 1`,
    [retailerId]
  );
 
  for (const { ids } of dupes.rows) {
    const [primary, ...duplicates] = ids;
    await mergeCustomerRecords(primary, duplicates);
  }
}

Results

After the pipeline went live with five pilot retailers, import times for a full initial sync of 10,000+ customers dropped from a manual 3-day process (connecting to each system separately, cleaning in Excel) to under 20 minutes automated. Incremental syncs run every 4 hours and take under 2 minutes per retailer.

Deduplication caught about 12% duplicate customers on average across the pilot set—mostly people who had shopped both in-store and online. Without it, every campaign would have been sending duplicates.

Key Takeaways

  • Define the canonical schema before writing adapters, not after. Retrofitting a schema onto existing adapters is painful.
  • AsyncGenerator for streaming is essential when source systems don't paginate cleanly or have large datasets.
  • Deterministic entity IDs via hashing eliminate the need for complex reconciliation logic between imports.
  • Deduplication is not optional for multi-system imports—expect 10-15% duplicates when combining in-store and online data.
  • Dagster's partitioned assets give you per-retailer isolation and failure visibility that's hard to replicate with cron jobs.