Albert ETL Script — Complete Master Guide

Edited

Everything: Script construction, TaskMetadata structure, intervals, and multi-inventory handling


PART A: SCRIPT CONSTRUCTION BASICS


🎯 Overview

What Scripts Do

Every Albert ETL script:

  1. Reads lab output data from a file (CSV, Excel, etc.)

  2. Extracts relevant measurements and parameters

  3. Maps file columns to Albert DataColumns (DACs)

  4. Returns a standardized pandas DataFrame

The Standard Return Structure

Every script must return exactly this:

return [{
    "InvId": "",
    "Lot": "",
    "Interval": "",
    "Data": out_df  # pandas DataFrame
}]

Field

Value

When Populated

InvId

string

Interval scripts with 1 inventory only

Lot

string

Interval scripts with 1 inventory only

Interval

empty string

Always left empty — Albert fills this

Data

DataFrame

Always populated


📋 Script Structure Checklist

Every script includes these sections:

1. Helper functions (_norm, _parse_number_or_none)
2. Load and validate file
3. Find header row by name scanning
4. Parse and normalize data
5. Filter junk/footer rows
6. Build output DataFrame
7. Return standardized structure

Required Helper: _parse_number_or_none()

This robust parser handles all number formats:

Input

Output

"57,95"

57.95 (European decimal)

"1,234.56"

1234.56 (thousands separator)

"1.15 R"

1.15 (letter suffix)

"N/A", "NA", "ND"

None (null tokens)


⚡ Key Rules (Always Follow)

Never hardcode row numbers — Scan for headers by name
Filter footer rows in Excel — Use numeric column presence
Exclude hidden DACs — Check hidden: true in payload
Include Comments column — Set to None unless instructed
Handle missing columns gracefully — Fill with None, never crash


🛡️ Error Handling with Try/Except Blocks

Why it matters: When Albert returns a generic error, you need to know exactly which part of your script failed. Wrapping code sections in try/except blocks makes debugging much easier.

Pattern: Wrap Major Processing Sections

def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
    try:
        # SECTION 1: Load and parse file
        try:
            df = pd.read_csv(data_buffer)
        except Exception as e:
            raise ValueError(f"Failed to load CSV file: {str(e)}")
        
        # SECTION 2: Find header row
        try:
            header_row = _find_header_row(df, ["Temperature", "Viscosity"])
            if header_row == -1:
                raise ValueError("Required columns (Temperature, Viscosity) not found in file")
            df = df.iloc[header_row:].reset_index(drop=True)
        except Exception as e:
            raise ValueError(f"Header detection failed: {str(e)}")
        
        # SECTION 3: Parse data
        try:
            out_data = []
            for _, row in df.iterrows():
                out_data.append({
                    "Temperature": row.get("Temperature", ""),
                    "Viscosity": _parse_number_or_none(row.get("Viscosity")),
                })
        except Exception as e:
            raise ValueError(f"Data parsing failed: {str(e)}")
        
        out_df = pd.DataFrame(out_data)
        
        # SECTION 4: Handle intervals (if applicable)
        try:
            if task_metadata and task_metadata.parameter_names:
                blocks = task_metadata.split_dataframe_into_intervals(
                    dataframe=out_df,
                    interval_columns=task_metadata.parameter_names,
                )
                return blocks
        except Exception as e:
            raise ValueError(f"Interval splitting failed: {str(e)}")
        
        return [{
            "InvId": "",
            "Lot": "",
            "Interval": "",
            "Data": out_df
        }]
    
    except Exception as e:
        # Final catch-all: re-raise with context
        raise RuntimeError(f"ETL script failed: {str(e)}")

Benefits

User sees specific error message — "Header detection failed" instead of "Error in ETL"
Easier debugging — You know exactly which section broke
Better error reporting — Users can provide meaningful error feedback

Example Error Messages Users Will See

Instead of:

Error: list index out of range

They'll see:

Error: Header detection failed: Required columns (Temperature, Viscosity) not found in file


PART B: TASKMETADATA — THE RAW STRUCTURE


What TaskMetadata Contains

When Albert calls your ETL() function, you'll typically use one of these signatures:

# Standard approach (most common in Albert automation)
def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
    # task_metadata: TaskMetadata object
    # data_buffer: File-like object (already opened, positioned)

# Alternative approach (when working with file paths)
def ETL(raw_file_path: str, task_data: TaskMetadata = None) -> list[dict]:
    # raw_file_path: String path to file
    # task_data: TaskMetadata object or None

Key difference:

  • First approach: Albert opens the file and passes a file-like object (data_buffer)

  • Second approach: You handle opening the file from a path

Both receive TaskMetadata containing everything Albert knows about the current test. The examples in this guide use the second approach, but both work identically for accessing task metadata.

The Raw Structure (Pseudocode)

TaskMetadata {
    # Simple properties
    block_id: str
    
    # Lists of objects
    inventory_items: list[Inventory]
    workflow_intervals: list[Interval]
    parameter_groups: list[ParameterGroup]
    parameter_names: list[str]
    
    # Complex nested structure (advanced)
    blockdata: BlockData  # Optional, may be None
    
    # Methods (server-side logic)
    split_dataframe_into_intervals(...)
    get_inventory_id_from_lot_id(...)
    get_lot_ids(...)
    get_interval_id(...)
}

Full Visual Structure

task_data (TaskMetadata)
├── block_id (str) — "BLK-uuid-12345"
│
├── inventory_items (list[Inventory])
│   ├── [0]
│   │   ├── id (str) — "INV00123"
│   │   ├── lot_id (str | None) — "LOT-2024-001"
│   │   ├── lot_number (str | None) — "Batch A"
│   │   └── barcode_id (str | None) — "BC12345"
│   ├── [1]
│   └── ...
│
├── workflow_intervals (list[Interval])
│   ├── [0]
│   │   ├── parameter_name (str) — "Temperature"
│   │   └── value (str | float) — "20"
│   ├── [1]
│   └── ...
│
├── parameter_names (list[str])
│   ├── "Temperature"
│   ├── "Frequency"
│   └── ...
│
├── parameter_groups (list[ParameterGroup])
│   ├── [0]
│   │   ├── name (str)
│   │   └── parameters (list[str])
│   └── ...
│
├── blockdata (BlockData | None)
│   ├── datatemplate (list[DataTemplate])
│   │   └── [0]
│   │       ├── id (str) — "DAT314"
│   │       └── name (str) — "PAC Properties"
│   │
│   └── workflow (list[Workflow])
│       └── [0]
│           ├── albert_id (str)
│           └── intervals (list[WorkflowInterval])
│
└── Methods:
    ├── split_dataframe_into_intervals(...)
    ├── get_inventory_id_from_lot_id(...)
    ├── get_lot_ids(...)
    └── get_interval_id(...)

Properties In Detail

1. block_id: str

Unique identifier for this test run.

task_data.block_id  # "BLKID-550e8400-e29b-41d4-a716-446655440000"

Use for: Logging, tracing which test created your output.


2. inventory_items: list[Inventory]

List of all inventory items (samples/materials) being tested.

The Inventory object:

Field

Type

Optional?

Example

id

str

No

"INV00001"

lot_id

str

Yes

"LOT-2024-001" or None

lot_number

str

Yes

"Batch A" or None

barcode_id

str

Yes

"BC12345" or None

Example:

items = task_data.inventory_items
# [
#   Inventory(id="INV001", lot_id="LOT-001", lot_number="Batch A", barcode_id="BC123"),
#   Inventory(id="INV002", lot_id="LOT-002", lot_number="Batch B", barcode_id="BC124"),
# ]

for item in items:
    inv_id = item.id
    lot_id = item.lot_id or ""  # Safe
    lot_number = item.lot_number or ""
    barcode = item.barcode_id or ""

3. workflow_intervals: list[Interval]

List of all interval definitions (conditions at which measurements were taken).

Important: Flat list, not grouped by parameter.

The Interval object:

Field

Type

Description

parameter_name

str

Name (e.g., "Temperature", "Time", "Frequency")

value

str or float

The value (typically stored as string: "20", "50.5")

Example:

intervals = task_data.workflow_intervals
# [
#   Interval(parameter_name="Temperature", value="20"),
#   Interval(parameter_name="Temperature", value="50"),
#   Interval(parameter_name="Temperature", value="100"),
# ]

# Filter for one parameter
temps = [iv.value for iv in task_data.workflow_intervals 
         if iv.parameter_name == "Temperature"]
# ["20", "50", "100"]

4. parameter_names: list[str]

Simple list of all parameter names in the workflow.

task_data.parameter_names  # ["Temperature", "Frequency", "Time"]

Use for: Quick check if task has certain intervals, getting interval columns.


5. parameter_groups: list[ParameterGroup] (Rarely used)

Parameters organized into logical groups. Usually ignore this.


6. blockdata: BlockData | None (Advanced)

Complex nested structure. May be None. Always check before using.

if task_data.blockdata and task_data.blockdata.datatemplate:
    dt = task_data.blockdata.datatemplate[0]
    print(f"Data Template: {dt.name} (ID: {dt.id})")


PART C: TASKMETADATA METHODS


Method 1: get_inventory_id_from_lot_id(lot_id: str) -> str

Look up inventory ID from lot ID.

lot_id = "LOT-2024-001"
inv_id = task_data.get_inventory_id_from_lot_id(lot_id)
# Returns: "INV00123"

Method 2: get_lot_ids(inventory_id: str) -> list[str]

Get all lot IDs for an inventory.

inv_id = "INV00123"
lots = task_data.get_lot_ids(inv_id)
# Returns: ["LOT-2024-001", "LOT-2024-002"]

Method 3: get_interval_id(params_dict: dict[str, str]) -> str

Look up interval ID for parameter values. Values must be strings matching exactly.

params = {
    "Temperature": "50",
    "Frequency": "1000"
}

try:
    interval_id = task_data.get_interval_id(params)
except ValueError as e:
    print(f"Interval not found: {e}")

Value Formatting Rules:

# Integer parameters
{"Sample_Number": "1", "Cycle": "2"}  # Correct (strings)

# Float parameters — no trailing zeros
{"Temperature": "50", "Frequency": "1500.5"}  # Correct

# Use formatter for floats
def fmt_float(f: float) -> str:
    return "{:.10f}".format(f).rstrip("0").rstrip(".")

Method 4: split_dataframe_into_intervals(...) → The Main One

Split DataFrame into multiple result blocks based on interval columns.

Signature:

blocks = task_data.split_dataframe_into_intervals(
    dataframe: pd.DataFrame,
    interval_columns: list[str],
    inventory_id_column: str | None = None,
    lot_id_column: str | None = None,
) -> list[dict]

Returns: list[dict] — each dict is a result block.

Example 1: Simple Usage

out_df = pd.DataFrame({
    "Temperature": ["20", "50", "100"],
    "Storage_Modulus": [1000, 1200, 1500],
})

blocks = task_data.split_dataframe_into_intervals(
    dataframe=out_df,
    interval_columns=["Temperature"],
)

# Returns 3 blocks (one per temperature)
for block in blocks:
    print(f"Interval: {block['Interval']}")
    # Output: ('20',), ('50',), ('100',)

Example 2: With Inventory Auto-Population

out_df = pd.DataFrame({
    "Inventory_ID": ["INV001", "INV001", "INV002"],
    "Temperature": ["20", "50", "20"],
    "Storage_Modulus": [1000, 1200, 950],
})

blocks = task_data.split_dataframe_into_intervals(
    dataframe=out_df,
    interval_columns=["Temperature"],
    inventory_id_column="Inventory_ID",
)

# Returns 3 blocks with InvId populated
for block in blocks:
    print(f"InvId: {block['InvId']}, Interval: {block['Interval']}")

Example 3: Multi-Interval

out_df = pd.DataFrame({
    "Temperature": ["20", "20", "50", "50"],
    "Frequency": ["10", "100", "10", "100"],
    "Impedance": [1000, 950, 1100, 1050],
})

blocks = task_data.split_dataframe_into_intervals(
    dataframe=out_df,
    interval_columns=["Temperature", "Frequency"],
)

# Returns 4 blocks (one per combo)
for block in blocks:
    print(f"Interval: {block['Interval']}")
    # Output: ('20', '10'), ('20', '100'), ('50', '10'), ('50', '100')


PART D: INTERVALS


📊 What Are Intervals?

Intervals = measurements taken under different conditions or at different times.

One test produces multiple rows:

Scenario

Conditions

Result

Temperature sweep

20°C, 50°C, 100°C

3 intervals

Time series

0s, 30s, 60s, 90s

4 intervals

Frequency response

10 Hz, 100 Hz, 1 kHz

3 intervals


⚠️ THE CRITICAL RULE FOR INTERVALS

Any script returning intervals MUST stamp InvId and Lot onto every block.

Missing these fields causes front-end validation errors. This is non-negotiable.


Standard Interval Script Pattern

def ETL(raw_file_path: str, task_data: TaskMetadata = None) -> list[dict]:
    
    # STEP 1: Resolve inventory from task metadata
    inv_id = ""
    lot_id = ""
    
    items = task_data.inventory_items if task_data is not None else []
    if len(items) == 1:
        inv_id = items[0].id or ""
        lot_id = items[0].lot_id or ""
    # If 0 or 2+ items, leave as empty strings — do NOT guess
    
    # STEP 2: Load file and extract data
    df = pd.read_csv(raw_file_path)
    out_df = pd.DataFrame(...)  # Your extraction logic
    
    # STEP 3: Split into interval blocks
    blocks = task_data.split_dataframe_into_intervals(
        dataframe=out_df,
        interval_columns=["Temperature", "Frequency"],
        inventory_id_column=None,
        lot_id_column=None,
    )
    
    # STEP 4: ⚠️ STAMP InvId and Lot onto EVERY block
    for block in blocks:
        block["InvId"] = inv_id
        block["Lot"] = lot_id
    
    return blocks

Inventory Rules for Intervals

1 inventory item
  ↓
  Stamp its ID and lot_id onto ALL returned blocks

2+ inventory items
  ↓
  DON'T use simple interval stamping
  Use multi-inventory approach (see Multi-Inventory section)

Note: Property tasks always have at least 1 inventory item. There is no scenario with 0 inventory.


Interval Value Formatting — Exact String Matching

Critical: Your interval values MUST match Albert's storage format exactly — character for character.

# ❌ WRONG: Stripped trailing zeros
params = {
    "Temperature": "50",      # Albert stored "50.000"
    "Frequency": "1500.5"     # Albert stored "1500.500"
}
interval_id = task_data.get_interval_id(params)  # FAILS!

# ✅ CORRECT: Match Albert's exact format
params = {
    "Temperature": "50.000",      # Matches exactly
    "Frequency": "1500.500"       # Matches exactly
}
interval_id = task_data.get_interval_id(params)  # Works!

How to check Albert's format:

  1. Look at the payload's workflow_intervals

  2. Check what values are stored

  3. Match your strings exactly

The formatter should preserve, not strip:

# ❌ WRONG: Strips trailing zeros
def _fmt_float(f: float) -> str:
    return "{:.10f}".format(f).rstrip("0").rstrip(".")
# Result: "50" instead of "50.000"

# ✅ CORRECT: Check what Albert expects
# If Albert stores "50.000", keep the zeros
def _fmt_float(f: float, decimal_places: int = 3) -> str:
    return "{:.{prec}f}".format(f, prec=decimal_places)
# Result: "50.000" matches Albert's format

Rule: Always verify against the actual interval values in task metadata, not what "seems right".


Building DataFrames for Intervals

Critical: Build output DataFrame from plain Python dicts, NOT from pandas groupby.

# ❌ WRONG
out_df = df.groupby("Temperature").agg(...).reset_index()
# Result: Temperature column contains np.float64 values

# ✅ CORRECT
rows = []
for temp in df["Temperature"].unique():
    subset = df[df["Temperature"] == temp]
    rows.append({
        "Temperature": str(int(temp)),  # Plain Python string
        "Storage_Modulus": subset["Storage_Modulus"].mean(),
    })
out_df = pd.DataFrame(rows)

Examples:

_fmt_float(10000.0) # "10000" _fmt_float(1500.5) # "1500.5" _fmt_float(0.00001) # "0.00001"


**Why?** `split_dataframe_into_intervals()` matches values as strings. NumPy types like `np.float64` convert to `"50.0"` but Albert stores `"50"` → MISMATCH.

---

---

# PART E: MULTI-INVENTORY

---

## 🔀 When Does a Task Have Multiple Inventories?

When `task_data.inventory_items` has **2 or more entries**.

```python
items = task_data.inventory_items

if len(items) == 0:
    # No inventory context
elif len(items) == 1:
    # Single inventory → can stamp onto intervals
elif len(items) >= 2:
    # MULTI-INVENTORY → special handling required

Three Approaches

Approach A: Separate Results Per Inventory

Use when: File has labeled sections (rows 1–10 for Sample A, rows 11–20 for Sample B)

def ETL(raw_file_path: str, task_data: TaskMetadata = None) -> list[dict]:
    items = task_data.inventory_items if task_data else []
    results = []
    
    for item in items:
        # Extract data for THIS inventory
        item_data = extract_data_for_inventory(item.id, raw_file_path)
        
        # Build result for THIS item
        out_df = pd.DataFrame([{
            "Comments": None,
            "Force": item_data["force"],
            "Break_Type": item_data["break_type"],
        }])
        
        results.append({
            "InvId": item.id,
            "Lot": item.lot_id or "",
            "Interval": "",
            "Data": out_df
        })
    
    return results  # MULTIPLE result dicts, one per inventory

Result: Returns separate result block for each inventory item.


Approach B: Single Result with Inventory in Data

Use when: File is flat (each row = different sample with IDs embedded)

def ETL(raw_file_path: str, task_data: TaskMetadata = None) -> list[dict]:
    df = pd.read_csv(raw_file_path)
    
    # Extract all data into one DataFrame
    out_data = []
    for _, row in df.iterrows():
        out_data.append({
            "Comments": None,
            "Force": _parse_number_or_none(row["Force"]),
            "Inventory_ID": row.get("Sample_ID", ""),  # Track inventory in data
            "Lot_ID": row.get("Lot", ""),
        })
    
    out_df = pd.DataFrame(out_data)
    
    return [{
        "InvId": "",      # No single InvId — all in data
        "Lot": "",        # No single Lot — all in data
        "Interval": "",
        "Data": out_df
    }]

Result: One result dict. Inventory info lives in data columns.


Approach C: Multi-Inventory + Intervals (Loop Each Separately)

Use when: Each inventory is measured at multiple intervals, file has separate sections per inventory

def ETL(raw_file_path: str, task_data: TaskMetadata = None) -> list[dict]:
    items = task_data.inventory_items if task_data else []
    df = pd.read_csv(raw_file_path)
    
    all_blocks = []
    
    # Process EACH inventory separately
    for item in items:
        # Filter data for THIS inventory
        item_df = df[df["Sample_ID"] == item.id].copy()
        
        out_df = pd.DataFrame(...)  # Your extraction logic
        
        # Split THIS inventory's intervals
        blocks = task_data.split_dataframe_into_intervals(
            dataframe=item_df,
            interval_columns=["Temperature"],
        )
        
        # ⚠️ Stamp THIS inventory's ID onto ITS blocks
        for block in blocks:
            block["InvId"] = item.id
            block["Lot"] = item.lot_id or ""
            all_blocks.append(block)
    
    return all_blocks

Result: Multiple blocks (inventories × intervals), each stamped correctly.


Approach D: Multi-Inventory + Intervals (Using Column Parameters) — Most Common

Use when: File is flat with inventory/lot IDs as columns, alongside interval columns.

This is the most efficient approach when your data has inventory info embedded.

def ETL(raw_file_path: str, task_data: TaskMetadata = None) -> list[dict]:
    df = pd.read_csv(raw_file_path)
    
    # Your data looks like:
    # | Sample_ID | Lot_ID | Temperature | Viscosity | pH |
    # | INV001    | LOT001 | 20          | 1000      | 6.5 |
    # | INV001    | LOT001 | 50          | 950       | 6.4 |
    # | INV002    | LOT002 | 20          | 1050      | 6.6 |
    # | INV002    | LOT002 | 50          | 980       | 6.5 |
    
    out_df = pd.DataFrame({
        "Sample_ID": df["Sample_ID"],
        "Lot_ID": df["Lot_ID"],
        "Temperature": df["Temperature"],
        "Viscosity": df["Viscosity"],
        "pH": df["pH"],
    })
    
    # Split by temperature AND auto-populate InvId/Lot from columns
    blocks = task_data.split_dataframe_into_intervals(
        dataframe=out_df,
        interval_columns=["Temperature"],  # Split by this
        inventory_id_column="Sample_ID",   # ← Auto-populate InvId from this column
        lot_id_column="Lot_ID",            # ← Auto-populate Lot from this column
    )
    
    # ⚠️ Still need to stamp if not auto-populated
    # (Usually already populated by split_dataframe_into_intervals)
    
    return blocks

Result: Albert automatically:

  • Creates separate blocks per temperature

  • Extracts InvId from Sample_ID column

  • Extracts Lot from Lot_ID column

  • Groups by unique Sample_ID/Lot_ID combinations

Advantage: No manual looping, no manual stamping — cleaner and less error-prone.


Decision Tree

2+ inventory items?

├─ NO → Use standard single-inventory approach

└─ YES → Multi-inventory task
   │
   ├─ Are intervals involved?
   │  │
   │  ├─ NO → Use Approach B (single DataFrame, inventory in data)
   │  │
   │  └─ YES → Data has inventory IDs as columns?
   │     │
   │     ├─ YES (most common) → Use Approach D
   │     │   (Pass inventory_id_column and lot_id_column to split_dataframe_into_intervals)
   │     │
   │     └─ NO → Use Approach C
   │         (Loop each inventory separately, then split)
   │
   └─ Key rule: Never mix inventory items in a single result block

🚀 The Golden Rule

Never mix inventory items in a single result block.

If 2+ inventories exist and you're returning intervals, loop through each inventory separately. Each returned block should represent one inventory's data only.

Failure mode: Front-end doesn't know which measurements belong to which sample → data loss.



PART F: PITFALLS & FIXES


Pitfall #1: Intervals Without Inventory Info

Symptom: Albert shows validation error "InvId missing on interval block"

Root cause: Script forgot to stamp InvId and Lot onto result blocks.

Fix:

# ❌ WRONG
blocks = task_data.split_dataframe_into_intervals(...)
return blocks

# ✅ CORRECT
blocks = task_data.split_dataframe_into_intervals(...)
for block in blocks:
    block["InvId"] = inv_id  # Always stamp
    block["Lot"] = lot_id    # Always stamp
return blocks

Pitfall #2: Hidden DACs Leaking Into Output

Symptom: Extra unwanted columns appear in Data

Root cause: Script included columns with "hidden": true from payload.

Fix:

# ❌ WRONG
TARGET_COLS = [col["Name"] for col in payload["DataColumns"]]

# ✅ CORRECT
TARGET_COLS = [
    col["Name"] 
    for col in payload["DataColumns"] 
    if not col.get("hidden", False)
]

Pitfall #3: NumPy Types in Interval Columns

Symptom: Intervals don't match DT definitions; show as "unknown"

Root cause: Interval columns contain np.float64 or np.int64 instead of plain Python strings.

Fix:

# ❌ WRONG
out_df = df.groupby("Temperature").agg(...).reset_index()

# ✅ CORRECT
rows = []
for temp in df["Temperature"].unique():
    rows.append({"Temperature": str(int(temp)), ...})
out_df = pd.DataFrame(rows)

Pitfall #4: Footer Rows in Excel Files

Symptom: Last row includes legal disclaimers; corrupts numeric data

Root cause: Excel sheets have footer text below the data.

Fix:

# ❌ WRONG
out_df = df.tail(1)

# ✅ CORRECT
numeric_cols = [
    c for c in df.columns 
    if c not in ("Batch Name", "Date", "Time", "Notes")
    and not c.startswith("Unnamed")
]
df = df[df[numeric_cols].notna().any(axis=1)]

Pitfall #5: Hardcoded Row Numbers

Symptom: Script works on one file, fails on similar file with different header position

Root cause: Header detection used hardcoded row number.

Fix:

# ❌ WRONG
df = pd.read_csv(raw_file_path, header=2)

# ✅ CORRECT
def _find_header_row(df_raw, required_cells):
    for i in range(len(df_raw)):
        row_vals = set(str(v).strip() for v in df_raw.iloc[i])
        if set(required_cells).issubset(row_vals):
            return i
    return -1

Pitfall #6: Missing Column → Script Crash

Symptom: Script crashes with KeyError when optional column is missing

Root cause: Code assumes column always exists.

Fix:

# ❌ WRONG
out_data["Yellowness"] = df["Yellowness"].apply(_parse_number_or_none)

# ✅ CORRECT
if "Yellowness" in df.columns:
    out_data["Yellowness"] = df["Yellowness"].apply(_parse_number_or_none)
else:
    out_data["Yellowness"] = None

Pitfall #7: Enum Value Mismatch

Symptom: Script sets "Pass" but payload expects "PASS"; validation fails

Root cause: Enum value doesn't match payload definition exactly.

Fix:

# ❌ WRONG
out_data["Status"] = df["Status"].str.upper()

# ✅ CORRECT
VALID_STATUS = ["Pass", "Fail", "Conditional"]

def _normalize_status(val):
    norm_val = str(val).strip().lower()
    for valid in VALID_STATUS:
        if valid.lower() == norm_val:
            return valid  # Return exact payload text
    return val  # Unrecognized → pass through

out_data["Status"] = df["Status"].apply(_normalize_status)

Pitfall #8: Number Parsing Failures

Symptom: Numeric columns show as None or incorrect values

Root cause: Parser doesn't handle comma decimals, thousands separators, or null tokens.

Fix: Always use the robust _parse_number_or_none() helper.

It handles: "57,95" (European), "1,234.56" (thousands), "1.15 R" (suffix), "N/A" (nulls)



PART G: PRE-DEPLOYMENT CHECKLIST


Use this before every script goes live:

Structural Requirements

  • [ ] Script has def ETL(raw_file_path: str, task_data: TaskMetadata = None) signature

  • [ ] Script returns list[dict] with keys: InvId, Lot, Interval, Data

  • [ ] Helper functions present: _norm(), _parse_number_or_none()

  • [ ] Header detection scans for names (NOT hardcoded row number)

  • [ ] Footer/junk rows filtered for Excel files

Data Mapping

  • [ ] TARGET_COLS derived from non-hidden DACs only

  • [ ] MAPPING dictionary created and reconciled vs TARGET_COLS

  • [ ] Comments column always present and set to None

  • [ ] No hidden DACs in output

  • [ ] 100% coverage: all non-hidden DACs from payload present in TARGET_COLS

Interval-Specific (If Applicable)

  • [ ] Interval columns identified and listed

  • [ ] task_data.split_dataframe_into_intervals() called with correct params

  • [ ] InvId and Lot stamped onto EVERY returned block

  • [ ] Interval values formatted as plain Python strings

  • [ ] No NumPy types in interval columns

  • [ ] For multi-inventory intervals: separate loop through each inventory

Multi-Inventory (If Applicable)

  • [ ] Logic to distinguish between 0, 1, and 2+ inventory items

  • [ ] Inventory IDs extracted from task_data.inventory_items

  • [ ] If 2+ inventories AND intervals: separate result blocks per inventory

  • [ ] If 2+ inventories, NO intervals: Approach B or C applied clearly

  • [ ] If 0 or 2+ inventories: InvId and Lot left as empty strings

Robustness

  • [ ] All numeric parsing uses _parse_number_or_none()

  • [ ] Optional columns handled with graceful None filling

  • [ ] Enum values normalized to match payload exactly

  • [ ] File delimiters auto-detected (CSV) or specified (Excel engine)

  • [ ] No crashes on missing columns or unexpected nulls

Testing

  • [ ] Script tested on actual sample file

  • [ ] Output DataFrame columns exactly match TARGET_COLS order

  • [ ] Row counts reasonable (no silent data loss)

  • [ ] No unexpected None values (validate _parse_number_or_none() working)

  • [ ] If interval script: interval blocks each have InvId, Lot populated



APPENDIX A: HANDLING DIFFERENT FILE FORMATS


Overview

Your ETL script receives data via data_buffer: IO (or data_buffer: BytesIO), a file-like object that Albert passes already opened. Different file formats require different parsing approaches.

Key principle: Always read from data_buffer as bytes/text, never assume a file path exists.


CSV Files (Most Common)

import pandas as pd
from io import BytesIO

def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
    # Standard CSV read
    df = pd.read_csv(data_buffer)
    
    return [{
        "InvId": "",
        "Lot": "",
        "Interval": "",
        "Data": df
    }]

CSV with Special Delimiters

# Tab-separated
df = pd.read_csv(data_buffer, sep='\t')

# Semicolon-separated
df = pd.read_csv(data_buffer, sep=';')

# Other delimiters
df = pd.read_csv(data_buffer, sep='|')

CSV with Decimal Comma (European Format)

# German/European: "1.234,56" → 1234.56
df = pd.read_csv(data_buffer, decimal=',', sep=';')

# Convert in pandas
df['Column'] = pd.to_numeric(df['Column'], errors='coerce')

CSV with Encoding Issues

# Latin-1 encoding (handles ° and special symbols)
df = pd.read_csv(data_buffer, encoding='latin1')

# ISO-8859-1 (similar to latin1)
df = pd.read_csv(data_buffer, encoding='ISO-8859-1')

# UTF-16 (raw text decoding)
bytes_content = data_buffer.read()
text = bytes_content.decode('UTF-16')

Excel Files (.xlsx, .xls)

import pandas as pd
from io import BytesIO

def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
    # Read Excel (default: first sheet)
    df = pd.read_excel(data_buffer)
    
    return [{
        "InvId": "",
        "Lot": "",
        "Interval": "",
        "Data": df
    }]

Excel with Specific Sheet

# By sheet name
df = pd.read_excel(data_buffer, sheet_name='Sheet2')

# By sheet index (0 = first sheet)
df = pd.read_excel(data_buffer, sheet_name=0)

# All sheets as dict
dfs = pd.read_excel(data_buffer, sheet_name=None)

Excel Without Headers

# No header row
df = pd.read_excel(data_buffer, header=None)

# Set column names manually
df.columns = ['Temperature', 'Viscosity', 'pH']

Excel with Specific Engine

# Openpyxl engine (recommended for modern Excel)
df = pd.read_excel(data_buffer, engine='openpyxl')

# Xlrd engine (older Excel files)
df = pd.read_excel(data_buffer, engine='xlrd')

Text Files (.txt)

Simple Text File

from io import BytesIO

def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
    # Read as text
    text = data_buffer.read().decode('utf-8')
    
    # Parse lines
    lines = text.splitlines()
    
    # Extract data manually or use regex
    data = []
    for line in lines:
        if line.strip():  # Skip empty lines
            data.append(line.split())
    
    # Convert to DataFrame
    df = pd.DataFrame(data, columns=['Col1', 'Col2', 'Col3'])
    
    return [{"InvId": "", "Lot": "", "Interval": "", "Data": df}]

Tab-Separated Text

# Read as CSV with tab delimiter
df = pd.read_csv(data_buffer, sep='\t', header=None)
df.columns = ['Temperature', 'Viscosity', 'pH']

Text with Regex Parsing

import re

def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
    text = data_buffer.read().decode('utf-8')
    
    # Find all numbers in a pattern
    pattern = r'Temperature:\s*(\d+)\s*Viscosity:\s*([\d.]+)'
    matches = re.findall(pattern, text)
    
    df = pd.DataFrame(matches, columns=['Temperature', 'Viscosity'])
    df = df.astype(float)
    
    return [{"InvId": "", "Lot": "", "Interval": "", "Data": df}]

JSON Files

import json
import pandas as pd

def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
    # Read JSON
    text = data_buffer.read().decode('utf-8')
    data = json.loads(text)
    
    # Convert to DataFrame
    df = pd.DataFrame(data['results'])
    
    return [{"InvId": "", "Lot": "", "Interval": "", "Data": df}]

JSON Arrays

# If JSON is array of objects: [{"temp": 20, "visc": 1000}, ...]
text = data_buffer.read().decode('utf-8')
data = json.loads(text)
df = pd.DataFrame(data)

Nested JSON

# If JSON has nested structure: {"batch": {"measurements": [...]}}
data = json.loads(text)
measurements = data['batch']['measurements']
df = pd.DataFrame(measurements)

Binary/Proprietary Formats

Handling Unknown Encodings

# Try UTF-8, fall back to latin1
try:
    text = data_buffer.read().decode('utf-8')
except UnicodeDecodeError:
    data_buffer.seek(0)
    text = data_buffer.read().decode('latin1', errors='replace')

Reading Raw Bytes

# For binary formats, read as bytes
bytes_content = data_buffer.read()

# Parse as needed (e.g., struct, numpy, etc.)
import struct
# unpacking depends on your format

Common Patterns & Best Practices

Pattern 1: Auto-Detect Format

def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
    # Try to read as CSV first
    try:
        df = pd.read_csv(data_buffer)
    except:
        # If CSV fails, try Excel
        data_buffer.seek(0)
        try:
            df = pd.read_excel(data_buffer)
        except:
            # Fall back to text parsing
            data_buffer.seek(0)
            text = data_buffer.read().decode('utf-8')
            # ... parse manually
    
    return [{"InvId": "", "Lot": "", "Interval": "", "Data": df}]

Pattern 2: Skip Headers and Footers

# Skip first 5 rows (metadata)
df = pd.read_csv(data_buffer, skiprows=5)

# Skip last row (total/notes)
df = df.iloc[:-1]

# Skip rows where column is null
df = df[df['Temperature'].notna()]

Pattern 3: Handle Encoding + Decimals

# Cream analogy: Your instruments in different countries (US vs EU)
df = pd.read_csv(
    data_buffer,
    encoding='ISO-8859-1',  # Handles special characters
    decimal=',',             # European decimals: 1.234,56
    sep=';'                  # European separator
)

Pattern 4: Multi-Sheet Processing

def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
    # Read all sheets
    all_sheets = pd.read_excel(data_buffer, sheet_name=None)
    
    results = []
    for sheet_name, df in all_sheets.items():
        # Process each sheet
        results.append({
            "InvId": "",
            "Lot": "",
            "Interval": "",
            "Data": df
        })
    
    return results

Error Handling for File Parsing

def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
    try:
        # Try reading
        df = pd.read_csv(data_buffer, encoding='utf-8', decimal='.')
    except UnicodeDecodeError:
        try:
            data_buffer.seek(0)
            df = pd.read_csv(data_buffer, encoding='latin1', decimal=',')
        except Exception as e:
            raise ValueError(f"File encoding/format parsing failed: {str(e)}")
    except pd.errors.ParserError as e:
        raise ValueError(f"CSV parsing failed - check delimiter/header: {str(e)}")
    except Exception as e:
        raise ValueError(f"Unexpected file parsing error: {str(e)}")
    
    return [{"InvId": "", "Lot": "", "Interval": "", "Data": df}]

Quick Reference Table

Format

Read Method

Common Issues

CSV

pd.read_csv(data_buffer)

Delimiter, encoding, decimals

Excel

pd.read_excel(data_buffer)

Sheet name, engine, header row

Text

data_buffer.read().decode()

Encoding, line breaks, parsing

JSON

json.loads(data_buffer.read().decode())

Nested structure, encoding

TSV

pd.read_csv(data_buffer, sep='\t')

Header, encoding

UTF-16

data_buffer.read().decode('UTF-16')

Byte order mark (BOM)



When to Escalate

Escalate to Copilot If:

  • You don't understand what columns the payload defines

  • File format is something other than CSV/Excel/PDF

  • You need to write an interval script but unsure about multi-inventory handling

  • Number parsing is failing on a weird decimal format

  • Script runs but output doesn't match expected column names

Self-Troubleshoot If:

  • Script crashes on missing column → Use if col in df check

  • Hidden DACs in output → Filter with if not col.get("hidden")

  • Footer rows corrupting data → Use numeric column filter

  • NumPy types in interval columns → Build from plain dicts, not groupby


Key Takeaways

  1. Return structure is sacred: Always InvId, Lot, Interval, Data

  2. Intervals need inventory: Every interval block MUST have InvId and Lot stamped

  3. Multi-inventory + intervals = loop each separately: Never mix inventory items in one block

  4. Never trust file structure: Always scan for headers, filter footers, handle missing columns

  5. Parse numbers robustly: Use _parse_number_or_none() for all numerics

  6. Hidden columns are invisible: Filter before building output

  7. Leave intervals to Albert: Your script builds interval columns; Albert creates the Interval field

  8. TaskMetadata is your friend: Use it for inventory, intervals, and validation

  9. Stamp before returning: Always stamp InvId/Lot on interval blocks

  10. Test on real data: Use actual sample files before deployment

Was this article helpful?

Sorry about that! Care to tell us more?

Thanks for the feedback!

There was an issue submitting your feedback
Please check your connection and try again.