Albert ETL Script — Complete Master Guide
Everything: Script construction, TaskMetadata structure, intervals, and multi-inventory handling
PART A: SCRIPT CONSTRUCTION BASICS
🎯 Overview
What Scripts Do
Every Albert ETL script:
Reads lab output data from a file (CSV, Excel, etc.)
Extracts relevant measurements and parameters
Maps file columns to Albert DataColumns (DACs)
Returns a standardized pandas DataFrame
The Standard Return Structure
Every script must return exactly this:
return [{
"InvId": "",
"Lot": "",
"Interval": "",
"Data": out_df # pandas DataFrame
}]
Field | Value | When Populated |
|---|---|---|
| string | Interval scripts with 1 inventory only |
| string | Interval scripts with 1 inventory only |
| empty string | Always left empty — Albert fills this |
| DataFrame | Always populated |
📋 Script Structure Checklist
Every script includes these sections:
1. Helper functions (_norm, _parse_number_or_none)
2. Load and validate file
3. Find header row by name scanning
4. Parse and normalize data
5. Filter junk/footer rows
6. Build output DataFrame
7. Return standardized structure
Required Helper: _parse_number_or_none()
This robust parser handles all number formats:
Input | Output |
|---|---|
|
|
|
|
|
|
|
|
⚡ Key Rules (Always Follow)
✓ Never hardcode row numbers — Scan for headers by name
✓ Filter footer rows in Excel — Use numeric column presence
✓ Exclude hidden DACs — Check hidden: true in payload
✓ Include Comments column — Set to None unless instructed
✓ Handle missing columns gracefully — Fill with None, never crash
🛡️ Error Handling with Try/Except Blocks
Why it matters: When Albert returns a generic error, you need to know exactly which part of your script failed. Wrapping code sections in try/except blocks makes debugging much easier.
Pattern: Wrap Major Processing Sections
def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
try:
# SECTION 1: Load and parse file
try:
df = pd.read_csv(data_buffer)
except Exception as e:
raise ValueError(f"Failed to load CSV file: {str(e)}")
# SECTION 2: Find header row
try:
header_row = _find_header_row(df, ["Temperature", "Viscosity"])
if header_row == -1:
raise ValueError("Required columns (Temperature, Viscosity) not found in file")
df = df.iloc[header_row:].reset_index(drop=True)
except Exception as e:
raise ValueError(f"Header detection failed: {str(e)}")
# SECTION 3: Parse data
try:
out_data = []
for _, row in df.iterrows():
out_data.append({
"Temperature": row.get("Temperature", ""),
"Viscosity": _parse_number_or_none(row.get("Viscosity")),
})
except Exception as e:
raise ValueError(f"Data parsing failed: {str(e)}")
out_df = pd.DataFrame(out_data)
# SECTION 4: Handle intervals (if applicable)
try:
if task_metadata and task_metadata.parameter_names:
blocks = task_metadata.split_dataframe_into_intervals(
dataframe=out_df,
interval_columns=task_metadata.parameter_names,
)
return blocks
except Exception as e:
raise ValueError(f"Interval splitting failed: {str(e)}")
return [{
"InvId": "",
"Lot": "",
"Interval": "",
"Data": out_df
}]
except Exception as e:
# Final catch-all: re-raise with context
raise RuntimeError(f"ETL script failed: {str(e)}")
Benefits
✓ User sees specific error message — "Header detection failed" instead of "Error in ETL"
✓ Easier debugging — You know exactly which section broke
✓ Better error reporting — Users can provide meaningful error feedback
Example Error Messages Users Will See
Instead of:
Error: list index out of range
They'll see:
Error: Header detection failed: Required columns (Temperature, Viscosity) not found in file
PART B: TASKMETADATA — THE RAW STRUCTURE
What TaskMetadata Contains
When Albert calls your ETL() function, you'll typically use one of these signatures:
# Standard approach (most common in Albert automation)
def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
# task_metadata: TaskMetadata object
# data_buffer: File-like object (already opened, positioned)
# Alternative approach (when working with file paths)
def ETL(raw_file_path: str, task_data: TaskMetadata = None) -> list[dict]:
# raw_file_path: String path to file
# task_data: TaskMetadata object or None
Key difference:
First approach: Albert opens the file and passes a file-like object (
data_buffer)Second approach: You handle opening the file from a path
Both receive TaskMetadata containing everything Albert knows about the current test. The examples in this guide use the second approach, but both work identically for accessing task metadata.
The Raw Structure (Pseudocode)
TaskMetadata {
# Simple properties
block_id: str
# Lists of objects
inventory_items: list[Inventory]
workflow_intervals: list[Interval]
parameter_groups: list[ParameterGroup]
parameter_names: list[str]
# Complex nested structure (advanced)
blockdata: BlockData # Optional, may be None
# Methods (server-side logic)
split_dataframe_into_intervals(...)
get_inventory_id_from_lot_id(...)
get_lot_ids(...)
get_interval_id(...)
}
Full Visual Structure
task_data (TaskMetadata)
├── block_id (str) — "BLK-uuid-12345"
│
├── inventory_items (list[Inventory])
│ ├── [0]
│ │ ├── id (str) — "INV00123"
│ │ ├── lot_id (str | None) — "LOT-2024-001"
│ │ ├── lot_number (str | None) — "Batch A"
│ │ └── barcode_id (str | None) — "BC12345"
│ ├── [1]
│ └── ...
│
├── workflow_intervals (list[Interval])
│ ├── [0]
│ │ ├── parameter_name (str) — "Temperature"
│ │ └── value (str | float) — "20"
│ ├── [1]
│ └── ...
│
├── parameter_names (list[str])
│ ├── "Temperature"
│ ├── "Frequency"
│ └── ...
│
├── parameter_groups (list[ParameterGroup])
│ ├── [0]
│ │ ├── name (str)
│ │ └── parameters (list[str])
│ └── ...
│
├── blockdata (BlockData | None)
│ ├── datatemplate (list[DataTemplate])
│ │ └── [0]
│ │ ├── id (str) — "DAT314"
│ │ └── name (str) — "PAC Properties"
│ │
│ └── workflow (list[Workflow])
│ └── [0]
│ ├── albert_id (str)
│ └── intervals (list[WorkflowInterval])
│
└── Methods:
├── split_dataframe_into_intervals(...)
├── get_inventory_id_from_lot_id(...)
├── get_lot_ids(...)
└── get_interval_id(...)
Properties In Detail
1. block_id: str
Unique identifier for this test run.
task_data.block_id # "BLKID-550e8400-e29b-41d4-a716-446655440000"
Use for: Logging, tracing which test created your output.
2. inventory_items: list[Inventory]
List of all inventory items (samples/materials) being tested.
The Inventory object:
Field | Type | Optional? | Example |
|---|---|---|---|
| str | No |
|
| str | Yes |
|
| str | Yes |
|
| str | Yes |
|
Example:
items = task_data.inventory_items
# [
# Inventory(id="INV001", lot_id="LOT-001", lot_number="Batch A", barcode_id="BC123"),
# Inventory(id="INV002", lot_id="LOT-002", lot_number="Batch B", barcode_id="BC124"),
# ]
for item in items:
inv_id = item.id
lot_id = item.lot_id or "" # Safe
lot_number = item.lot_number or ""
barcode = item.barcode_id or ""
3. workflow_intervals: list[Interval]
List of all interval definitions (conditions at which measurements were taken).
Important: Flat list, not grouped by parameter.
The Interval object:
Field | Type | Description |
|---|---|---|
| str | Name (e.g., "Temperature", "Time", "Frequency") |
| str or float | The value (typically stored as string: "20", "50.5") |
Example:
intervals = task_data.workflow_intervals
# [
# Interval(parameter_name="Temperature", value="20"),
# Interval(parameter_name="Temperature", value="50"),
# Interval(parameter_name="Temperature", value="100"),
# ]
# Filter for one parameter
temps = [iv.value for iv in task_data.workflow_intervals
if iv.parameter_name == "Temperature"]
# ["20", "50", "100"]
4. parameter_names: list[str]
Simple list of all parameter names in the workflow.
task_data.parameter_names # ["Temperature", "Frequency", "Time"]
Use for: Quick check if task has certain intervals, getting interval columns.
5. parameter_groups: list[ParameterGroup] (Rarely used)
Parameters organized into logical groups. Usually ignore this.
6. blockdata: BlockData | None (Advanced)
Complex nested structure. May be None. Always check before using.
if task_data.blockdata and task_data.blockdata.datatemplate:
dt = task_data.blockdata.datatemplate[0]
print(f"Data Template: {dt.name} (ID: {dt.id})")
PART C: TASKMETADATA METHODS
Method 1: get_inventory_id_from_lot_id(lot_id: str) -> str
Look up inventory ID from lot ID.
lot_id = "LOT-2024-001"
inv_id = task_data.get_inventory_id_from_lot_id(lot_id)
# Returns: "INV00123"
Method 2: get_lot_ids(inventory_id: str) -> list[str]
Get all lot IDs for an inventory.
inv_id = "INV00123"
lots = task_data.get_lot_ids(inv_id)
# Returns: ["LOT-2024-001", "LOT-2024-002"]
Method 3: get_interval_id(params_dict: dict[str, str]) -> str
Look up interval ID for parameter values. Values must be strings matching exactly.
params = {
"Temperature": "50",
"Frequency": "1000"
}
try:
interval_id = task_data.get_interval_id(params)
except ValueError as e:
print(f"Interval not found: {e}")
Value Formatting Rules:
# Integer parameters
{"Sample_Number": "1", "Cycle": "2"} # Correct (strings)
# Float parameters — no trailing zeros
{"Temperature": "50", "Frequency": "1500.5"} # Correct
# Use formatter for floats
def fmt_float(f: float) -> str:
return "{:.10f}".format(f).rstrip("0").rstrip(".")
Method 4: split_dataframe_into_intervals(...) → The Main One
Split DataFrame into multiple result blocks based on interval columns.
Signature:
blocks = task_data.split_dataframe_into_intervals(
dataframe: pd.DataFrame,
interval_columns: list[str],
inventory_id_column: str | None = None,
lot_id_column: str | None = None,
) -> list[dict]
Returns: list[dict] — each dict is a result block.
Example 1: Simple Usage
out_df = pd.DataFrame({
"Temperature": ["20", "50", "100"],
"Storage_Modulus": [1000, 1200, 1500],
})
blocks = task_data.split_dataframe_into_intervals(
dataframe=out_df,
interval_columns=["Temperature"],
)
# Returns 3 blocks (one per temperature)
for block in blocks:
print(f"Interval: {block['Interval']}")
# Output: ('20',), ('50',), ('100',)
Example 2: With Inventory Auto-Population
out_df = pd.DataFrame({
"Inventory_ID": ["INV001", "INV001", "INV002"],
"Temperature": ["20", "50", "20"],
"Storage_Modulus": [1000, 1200, 950],
})
blocks = task_data.split_dataframe_into_intervals(
dataframe=out_df,
interval_columns=["Temperature"],
inventory_id_column="Inventory_ID",
)
# Returns 3 blocks with InvId populated
for block in blocks:
print(f"InvId: {block['InvId']}, Interval: {block['Interval']}")
Example 3: Multi-Interval
out_df = pd.DataFrame({
"Temperature": ["20", "20", "50", "50"],
"Frequency": ["10", "100", "10", "100"],
"Impedance": [1000, 950, 1100, 1050],
})
blocks = task_data.split_dataframe_into_intervals(
dataframe=out_df,
interval_columns=["Temperature", "Frequency"],
)
# Returns 4 blocks (one per combo)
for block in blocks:
print(f"Interval: {block['Interval']}")
# Output: ('20', '10'), ('20', '100'), ('50', '10'), ('50', '100')
PART D: INTERVALS
📊 What Are Intervals?
Intervals = measurements taken under different conditions or at different times.
One test produces multiple rows:
Scenario | Conditions | Result |
|---|---|---|
Temperature sweep | 20°C, 50°C, 100°C | 3 intervals |
Time series | 0s, 30s, 60s, 90s | 4 intervals |
Frequency response | 10 Hz, 100 Hz, 1 kHz | 3 intervals |
⚠️ THE CRITICAL RULE FOR INTERVALS
Any script returning intervals MUST stamp
InvIdandLotonto every block.Missing these fields causes front-end validation errors. This is non-negotiable.
Standard Interval Script Pattern
def ETL(raw_file_path: str, task_data: TaskMetadata = None) -> list[dict]:
# STEP 1: Resolve inventory from task metadata
inv_id = ""
lot_id = ""
items = task_data.inventory_items if task_data is not None else []
if len(items) == 1:
inv_id = items[0].id or ""
lot_id = items[0].lot_id or ""
# If 0 or 2+ items, leave as empty strings — do NOT guess
# STEP 2: Load file and extract data
df = pd.read_csv(raw_file_path)
out_df = pd.DataFrame(...) # Your extraction logic
# STEP 3: Split into interval blocks
blocks = task_data.split_dataframe_into_intervals(
dataframe=out_df,
interval_columns=["Temperature", "Frequency"],
inventory_id_column=None,
lot_id_column=None,
)
# STEP 4: ⚠️ STAMP InvId and Lot onto EVERY block
for block in blocks:
block["InvId"] = inv_id
block["Lot"] = lot_id
return blocks
Inventory Rules for Intervals
1 inventory item
↓
Stamp its ID and lot_id onto ALL returned blocks
2+ inventory items
↓
DON'T use simple interval stamping
Use multi-inventory approach (see Multi-Inventory section)
Note: Property tasks always have at least 1 inventory item. There is no scenario with 0 inventory.
Interval Value Formatting — Exact String Matching
Critical: Your interval values MUST match Albert's storage format exactly — character for character.
# ❌ WRONG: Stripped trailing zeros
params = {
"Temperature": "50", # Albert stored "50.000"
"Frequency": "1500.5" # Albert stored "1500.500"
}
interval_id = task_data.get_interval_id(params) # FAILS!
# ✅ CORRECT: Match Albert's exact format
params = {
"Temperature": "50.000", # Matches exactly
"Frequency": "1500.500" # Matches exactly
}
interval_id = task_data.get_interval_id(params) # Works!
How to check Albert's format:
Look at the payload's
workflow_intervalsCheck what values are stored
Match your strings exactly
The formatter should preserve, not strip:
# ❌ WRONG: Strips trailing zeros
def _fmt_float(f: float) -> str:
return "{:.10f}".format(f).rstrip("0").rstrip(".")
# Result: "50" instead of "50.000"
# ✅ CORRECT: Check what Albert expects
# If Albert stores "50.000", keep the zeros
def _fmt_float(f: float, decimal_places: int = 3) -> str:
return "{:.{prec}f}".format(f, prec=decimal_places)
# Result: "50.000" matches Albert's format
Rule: Always verify against the actual interval values in task metadata, not what "seems right".
Building DataFrames for Intervals
Critical: Build output DataFrame from plain Python dicts, NOT from pandas groupby.
# ❌ WRONG
out_df = df.groupby("Temperature").agg(...).reset_index()
# Result: Temperature column contains np.float64 values
# ✅ CORRECT
rows = []
for temp in df["Temperature"].unique():
subset = df[df["Temperature"] == temp]
rows.append({
"Temperature": str(int(temp)), # Plain Python string
"Storage_Modulus": subset["Storage_Modulus"].mean(),
})
out_df = pd.DataFrame(rows)
Examples:
_fmt_float(10000.0) # "10000" _fmt_float(1500.5) # "1500.5" _fmt_float(0.00001) # "0.00001"
**Why?** `split_dataframe_into_intervals()` matches values as strings. NumPy types like `np.float64` convert to `"50.0"` but Albert stores `"50"` → MISMATCH.
---
---
# PART E: MULTI-INVENTORY
---
## 🔀 When Does a Task Have Multiple Inventories?
When `task_data.inventory_items` has **2 or more entries**.
```python
items = task_data.inventory_items
if len(items) == 0:
# No inventory context
elif len(items) == 1:
# Single inventory → can stamp onto intervals
elif len(items) >= 2:
# MULTI-INVENTORY → special handling required
Three Approaches
Approach A: Separate Results Per Inventory
Use when: File has labeled sections (rows 1–10 for Sample A, rows 11–20 for Sample B)
def ETL(raw_file_path: str, task_data: TaskMetadata = None) -> list[dict]:
items = task_data.inventory_items if task_data else []
results = []
for item in items:
# Extract data for THIS inventory
item_data = extract_data_for_inventory(item.id, raw_file_path)
# Build result for THIS item
out_df = pd.DataFrame([{
"Comments": None,
"Force": item_data["force"],
"Break_Type": item_data["break_type"],
}])
results.append({
"InvId": item.id,
"Lot": item.lot_id or "",
"Interval": "",
"Data": out_df
})
return results # MULTIPLE result dicts, one per inventory
Result: Returns separate result block for each inventory item.
Approach B: Single Result with Inventory in Data
Use when: File is flat (each row = different sample with IDs embedded)
def ETL(raw_file_path: str, task_data: TaskMetadata = None) -> list[dict]:
df = pd.read_csv(raw_file_path)
# Extract all data into one DataFrame
out_data = []
for _, row in df.iterrows():
out_data.append({
"Comments": None,
"Force": _parse_number_or_none(row["Force"]),
"Inventory_ID": row.get("Sample_ID", ""), # Track inventory in data
"Lot_ID": row.get("Lot", ""),
})
out_df = pd.DataFrame(out_data)
return [{
"InvId": "", # No single InvId — all in data
"Lot": "", # No single Lot — all in data
"Interval": "",
"Data": out_df
}]
Result: One result dict. Inventory info lives in data columns.
Approach C: Multi-Inventory + Intervals (Loop Each Separately)
Use when: Each inventory is measured at multiple intervals, file has separate sections per inventory
def ETL(raw_file_path: str, task_data: TaskMetadata = None) -> list[dict]:
items = task_data.inventory_items if task_data else []
df = pd.read_csv(raw_file_path)
all_blocks = []
# Process EACH inventory separately
for item in items:
# Filter data for THIS inventory
item_df = df[df["Sample_ID"] == item.id].copy()
out_df = pd.DataFrame(...) # Your extraction logic
# Split THIS inventory's intervals
blocks = task_data.split_dataframe_into_intervals(
dataframe=item_df,
interval_columns=["Temperature"],
)
# ⚠️ Stamp THIS inventory's ID onto ITS blocks
for block in blocks:
block["InvId"] = item.id
block["Lot"] = item.lot_id or ""
all_blocks.append(block)
return all_blocks
Result: Multiple blocks (inventories × intervals), each stamped correctly.
Approach D: Multi-Inventory + Intervals (Using Column Parameters) — Most Common
Use when: File is flat with inventory/lot IDs as columns, alongside interval columns.
This is the most efficient approach when your data has inventory info embedded.
def ETL(raw_file_path: str, task_data: TaskMetadata = None) -> list[dict]:
df = pd.read_csv(raw_file_path)
# Your data looks like:
# | Sample_ID | Lot_ID | Temperature | Viscosity | pH |
# | INV001 | LOT001 | 20 | 1000 | 6.5 |
# | INV001 | LOT001 | 50 | 950 | 6.4 |
# | INV002 | LOT002 | 20 | 1050 | 6.6 |
# | INV002 | LOT002 | 50 | 980 | 6.5 |
out_df = pd.DataFrame({
"Sample_ID": df["Sample_ID"],
"Lot_ID": df["Lot_ID"],
"Temperature": df["Temperature"],
"Viscosity": df["Viscosity"],
"pH": df["pH"],
})
# Split by temperature AND auto-populate InvId/Lot from columns
blocks = task_data.split_dataframe_into_intervals(
dataframe=out_df,
interval_columns=["Temperature"], # Split by this
inventory_id_column="Sample_ID", # ← Auto-populate InvId from this column
lot_id_column="Lot_ID", # ← Auto-populate Lot from this column
)
# ⚠️ Still need to stamp if not auto-populated
# (Usually already populated by split_dataframe_into_intervals)
return blocks
Result: Albert automatically:
Creates separate blocks per temperature
Extracts InvId from Sample_ID column
Extracts Lot from Lot_ID column
Groups by unique Sample_ID/Lot_ID combinations
Advantage: No manual looping, no manual stamping — cleaner and less error-prone.
Decision Tree
2+ inventory items?
├─ NO → Use standard single-inventory approach
└─ YES → Multi-inventory task
│
├─ Are intervals involved?
│ │
│ ├─ NO → Use Approach B (single DataFrame, inventory in data)
│ │
│ └─ YES → Data has inventory IDs as columns?
│ │
│ ├─ YES (most common) → Use Approach D
│ │ (Pass inventory_id_column and lot_id_column to split_dataframe_into_intervals)
│ │
│ └─ NO → Use Approach C
│ (Loop each inventory separately, then split)
│
└─ Key rule: Never mix inventory items in a single result block
🚀 The Golden Rule
Never mix inventory items in a single result block.
If 2+ inventories exist and you're returning intervals, loop through each inventory separately. Each returned block should represent one inventory's data only.
Failure mode: Front-end doesn't know which measurements belong to which sample → data loss.
PART F: PITFALLS & FIXES
Pitfall #1: Intervals Without Inventory Info
Symptom: Albert shows validation error "InvId missing on interval block"
Root cause: Script forgot to stamp InvId and Lot onto result blocks.
Fix:
# ❌ WRONG
blocks = task_data.split_dataframe_into_intervals(...)
return blocks
# ✅ CORRECT
blocks = task_data.split_dataframe_into_intervals(...)
for block in blocks:
block["InvId"] = inv_id # Always stamp
block["Lot"] = lot_id # Always stamp
return blocks
Pitfall #2: Hidden DACs Leaking Into Output
Symptom: Extra unwanted columns appear in Data
Root cause: Script included columns with "hidden": true from payload.
Fix:
# ❌ WRONG
TARGET_COLS = [col["Name"] for col in payload["DataColumns"]]
# ✅ CORRECT
TARGET_COLS = [
col["Name"]
for col in payload["DataColumns"]
if not col.get("hidden", False)
]
Pitfall #3: NumPy Types in Interval Columns
Symptom: Intervals don't match DT definitions; show as "unknown"
Root cause: Interval columns contain np.float64 or np.int64 instead of plain Python strings.
Fix:
# ❌ WRONG
out_df = df.groupby("Temperature").agg(...).reset_index()
# ✅ CORRECT
rows = []
for temp in df["Temperature"].unique():
rows.append({"Temperature": str(int(temp)), ...})
out_df = pd.DataFrame(rows)
Pitfall #4: Footer Rows in Excel Files
Symptom: Last row includes legal disclaimers; corrupts numeric data
Root cause: Excel sheets have footer text below the data.
Fix:
# ❌ WRONG
out_df = df.tail(1)
# ✅ CORRECT
numeric_cols = [
c for c in df.columns
if c not in ("Batch Name", "Date", "Time", "Notes")
and not c.startswith("Unnamed")
]
df = df[df[numeric_cols].notna().any(axis=1)]
Pitfall #5: Hardcoded Row Numbers
Symptom: Script works on one file, fails on similar file with different header position
Root cause: Header detection used hardcoded row number.
Fix:
# ❌ WRONG
df = pd.read_csv(raw_file_path, header=2)
# ✅ CORRECT
def _find_header_row(df_raw, required_cells):
for i in range(len(df_raw)):
row_vals = set(str(v).strip() for v in df_raw.iloc[i])
if set(required_cells).issubset(row_vals):
return i
return -1
Pitfall #6: Missing Column → Script Crash
Symptom: Script crashes with KeyError when optional column is missing
Root cause: Code assumes column always exists.
Fix:
# ❌ WRONG
out_data["Yellowness"] = df["Yellowness"].apply(_parse_number_or_none)
# ✅ CORRECT
if "Yellowness" in df.columns:
out_data["Yellowness"] = df["Yellowness"].apply(_parse_number_or_none)
else:
out_data["Yellowness"] = None
Pitfall #7: Enum Value Mismatch
Symptom: Script sets "Pass" but payload expects "PASS"; validation fails
Root cause: Enum value doesn't match payload definition exactly.
Fix:
# ❌ WRONG
out_data["Status"] = df["Status"].str.upper()
# ✅ CORRECT
VALID_STATUS = ["Pass", "Fail", "Conditional"]
def _normalize_status(val):
norm_val = str(val).strip().lower()
for valid in VALID_STATUS:
if valid.lower() == norm_val:
return valid # Return exact payload text
return val # Unrecognized → pass through
out_data["Status"] = df["Status"].apply(_normalize_status)
Pitfall #8: Number Parsing Failures
Symptom: Numeric columns show as None or incorrect values
Root cause: Parser doesn't handle comma decimals, thousands separators, or null tokens.
Fix: Always use the robust _parse_number_or_none() helper.
It handles: "57,95" (European), "1,234.56" (thousands), "1.15 R" (suffix), "N/A" (nulls)
PART G: PRE-DEPLOYMENT CHECKLIST
Use this before every script goes live:
Structural Requirements
[ ] Script has
def ETL(raw_file_path: str, task_data: TaskMetadata = None)signature[ ] Script returns
list[dict]with keys:InvId,Lot,Interval,Data[ ] Helper functions present:
_norm(),_parse_number_or_none()[ ] Header detection scans for names (NOT hardcoded row number)
[ ] Footer/junk rows filtered for Excel files
Data Mapping
[ ]
TARGET_COLSderived from non-hidden DACs only[ ]
MAPPINGdictionary created and reconciled vsTARGET_COLS[ ]
Commentscolumn always present and set toNone[ ] No hidden DACs in output
[ ] 100% coverage: all non-hidden DACs from payload present in
TARGET_COLS
Interval-Specific (If Applicable)
[ ] Interval columns identified and listed
[ ]
task_data.split_dataframe_into_intervals()called with correct params[ ]
InvIdandLotstamped onto EVERY returned block[ ] Interval values formatted as plain Python strings
[ ] No NumPy types in interval columns
[ ] For multi-inventory intervals: separate loop through each inventory
Multi-Inventory (If Applicable)
[ ] Logic to distinguish between 0, 1, and 2+ inventory items
[ ] Inventory IDs extracted from
task_data.inventory_items[ ] If 2+ inventories AND intervals: separate result blocks per inventory
[ ] If 2+ inventories, NO intervals: Approach B or C applied clearly
[ ] If 0 or 2+ inventories:
InvIdandLotleft as empty strings
Robustness
[ ] All numeric parsing uses
_parse_number_or_none()[ ] Optional columns handled with graceful
Nonefilling[ ] Enum values normalized to match payload exactly
[ ] File delimiters auto-detected (CSV) or specified (Excel engine)
[ ] No crashes on missing columns or unexpected nulls
Testing
[ ] Script tested on actual sample file
[ ] Output DataFrame columns exactly match
TARGET_COLSorder[ ] Row counts reasonable (no silent data loss)
[ ] No unexpected
Nonevalues (validate_parse_number_or_none()working)[ ] If interval script: interval blocks each have
InvId,Lotpopulated
APPENDIX A: HANDLING DIFFERENT FILE FORMATS
Overview
Your ETL script receives data via data_buffer: IO (or data_buffer: BytesIO), a file-like object that Albert passes already opened. Different file formats require different parsing approaches.
Key principle: Always read from data_buffer as bytes/text, never assume a file path exists.
CSV Files (Most Common)
import pandas as pd
from io import BytesIO
def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
# Standard CSV read
df = pd.read_csv(data_buffer)
return [{
"InvId": "",
"Lot": "",
"Interval": "",
"Data": df
}]
CSV with Special Delimiters
# Tab-separated
df = pd.read_csv(data_buffer, sep='\t')
# Semicolon-separated
df = pd.read_csv(data_buffer, sep=';')
# Other delimiters
df = pd.read_csv(data_buffer, sep='|')
CSV with Decimal Comma (European Format)
# German/European: "1.234,56" → 1234.56
df = pd.read_csv(data_buffer, decimal=',', sep=';')
# Convert in pandas
df['Column'] = pd.to_numeric(df['Column'], errors='coerce')
CSV with Encoding Issues
# Latin-1 encoding (handles ° and special symbols)
df = pd.read_csv(data_buffer, encoding='latin1')
# ISO-8859-1 (similar to latin1)
df = pd.read_csv(data_buffer, encoding='ISO-8859-1')
# UTF-16 (raw text decoding)
bytes_content = data_buffer.read()
text = bytes_content.decode('UTF-16')
Excel Files (.xlsx, .xls)
import pandas as pd
from io import BytesIO
def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
# Read Excel (default: first sheet)
df = pd.read_excel(data_buffer)
return [{
"InvId": "",
"Lot": "",
"Interval": "",
"Data": df
}]
Excel with Specific Sheet
# By sheet name
df = pd.read_excel(data_buffer, sheet_name='Sheet2')
# By sheet index (0 = first sheet)
df = pd.read_excel(data_buffer, sheet_name=0)
# All sheets as dict
dfs = pd.read_excel(data_buffer, sheet_name=None)
Excel Without Headers
# No header row
df = pd.read_excel(data_buffer, header=None)
# Set column names manually
df.columns = ['Temperature', 'Viscosity', 'pH']
Excel with Specific Engine
# Openpyxl engine (recommended for modern Excel)
df = pd.read_excel(data_buffer, engine='openpyxl')
# Xlrd engine (older Excel files)
df = pd.read_excel(data_buffer, engine='xlrd')
Text Files (.txt)
Simple Text File
from io import BytesIO
def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
# Read as text
text = data_buffer.read().decode('utf-8')
# Parse lines
lines = text.splitlines()
# Extract data manually or use regex
data = []
for line in lines:
if line.strip(): # Skip empty lines
data.append(line.split())
# Convert to DataFrame
df = pd.DataFrame(data, columns=['Col1', 'Col2', 'Col3'])
return [{"InvId": "", "Lot": "", "Interval": "", "Data": df}]
Tab-Separated Text
# Read as CSV with tab delimiter
df = pd.read_csv(data_buffer, sep='\t', header=None)
df.columns = ['Temperature', 'Viscosity', 'pH']
Text with Regex Parsing
import re
def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
text = data_buffer.read().decode('utf-8')
# Find all numbers in a pattern
pattern = r'Temperature:\s*(\d+)\s*Viscosity:\s*([\d.]+)'
matches = re.findall(pattern, text)
df = pd.DataFrame(matches, columns=['Temperature', 'Viscosity'])
df = df.astype(float)
return [{"InvId": "", "Lot": "", "Interval": "", "Data": df}]
JSON Files
import json
import pandas as pd
def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
# Read JSON
text = data_buffer.read().decode('utf-8')
data = json.loads(text)
# Convert to DataFrame
df = pd.DataFrame(data['results'])
return [{"InvId": "", "Lot": "", "Interval": "", "Data": df}]
JSON Arrays
# If JSON is array of objects: [{"temp": 20, "visc": 1000}, ...]
text = data_buffer.read().decode('utf-8')
data = json.loads(text)
df = pd.DataFrame(data)
Nested JSON
# If JSON has nested structure: {"batch": {"measurements": [...]}}
data = json.loads(text)
measurements = data['batch']['measurements']
df = pd.DataFrame(measurements)
Binary/Proprietary Formats
Handling Unknown Encodings
# Try UTF-8, fall back to latin1
try:
text = data_buffer.read().decode('utf-8')
except UnicodeDecodeError:
data_buffer.seek(0)
text = data_buffer.read().decode('latin1', errors='replace')
Reading Raw Bytes
# For binary formats, read as bytes
bytes_content = data_buffer.read()
# Parse as needed (e.g., struct, numpy, etc.)
import struct
# unpacking depends on your format
Common Patterns & Best Practices
Pattern 1: Auto-Detect Format
def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
# Try to read as CSV first
try:
df = pd.read_csv(data_buffer)
except:
# If CSV fails, try Excel
data_buffer.seek(0)
try:
df = pd.read_excel(data_buffer)
except:
# Fall back to text parsing
data_buffer.seek(0)
text = data_buffer.read().decode('utf-8')
# ... parse manually
return [{"InvId": "", "Lot": "", "Interval": "", "Data": df}]
Pattern 2: Skip Headers and Footers
# Skip first 5 rows (metadata)
df = pd.read_csv(data_buffer, skiprows=5)
# Skip last row (total/notes)
df = df.iloc[:-1]
# Skip rows where column is null
df = df[df['Temperature'].notna()]
Pattern 3: Handle Encoding + Decimals
# Cream analogy: Your instruments in different countries (US vs EU)
df = pd.read_csv(
data_buffer,
encoding='ISO-8859-1', # Handles special characters
decimal=',', # European decimals: 1.234,56
sep=';' # European separator
)
Pattern 4: Multi-Sheet Processing
def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
# Read all sheets
all_sheets = pd.read_excel(data_buffer, sheet_name=None)
results = []
for sheet_name, df in all_sheets.items():
# Process each sheet
results.append({
"InvId": "",
"Lot": "",
"Interval": "",
"Data": df
})
return results
Error Handling for File Parsing
def ETL(task_metadata: Any, data_buffer: IO) -> list[dict]:
try:
# Try reading
df = pd.read_csv(data_buffer, encoding='utf-8', decimal='.')
except UnicodeDecodeError:
try:
data_buffer.seek(0)
df = pd.read_csv(data_buffer, encoding='latin1', decimal=',')
except Exception as e:
raise ValueError(f"File encoding/format parsing failed: {str(e)}")
except pd.errors.ParserError as e:
raise ValueError(f"CSV parsing failed - check delimiter/header: {str(e)}")
except Exception as e:
raise ValueError(f"Unexpected file parsing error: {str(e)}")
return [{"InvId": "", "Lot": "", "Interval": "", "Data": df}]
Quick Reference Table
Format | Read Method | Common Issues |
|---|---|---|
CSV |
| Delimiter, encoding, decimals |
Excel |
| Sheet name, engine, header row |
Text |
| Encoding, line breaks, parsing |
JSON |
| Nested structure, encoding |
TSV |
| Header, encoding |
UTF-16 |
| Byte order mark (BOM) |
When to Escalate
Escalate to Copilot If:
You don't understand what columns the payload defines
File format is something other than CSV/Excel/PDF
You need to write an interval script but unsure about multi-inventory handling
Number parsing is failing on a weird decimal format
Script runs but output doesn't match expected column names
Self-Troubleshoot If:
Script crashes on missing column → Use
if col in dfcheckHidden DACs in output → Filter with
if not col.get("hidden")Footer rows corrupting data → Use numeric column filter
NumPy types in interval columns → Build from plain dicts, not groupby
Key Takeaways
Return structure is sacred: Always
InvId,Lot,Interval,DataIntervals need inventory: Every interval block MUST have
InvIdandLotstampedMulti-inventory + intervals = loop each separately: Never mix inventory items in one block
Never trust file structure: Always scan for headers, filter footers, handle missing columns
Parse numbers robustly: Use
_parse_number_or_none()for all numericsHidden columns are invisible: Filter before building output
Leave intervals to Albert: Your script builds interval columns; Albert creates the Interval field
TaskMetadata is your friend: Use it for inventory, intervals, and validation
Stamp before returning: Always stamp InvId/Lot on interval blocks
Test on real data: Use actual sample files before deployment
