import csv
import io
import os
from datetime import datetime, UTC
from pathlib import Path
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.core.credentials import AzureKeyCredential
from azure.storage.blob import BlobServiceClient
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parent / ".env")
DI_ENDPOINT = os.environ["AZURE_DI_ENDPOINT"]
DI_KEY = os.environ["AZURE_DI_KEY"]
STORAGE_CONN_STR = os.environ["AZURE_STORAGE_CONNECTION_STRING"]
INPUT_CONTAINER = os.getenv("AZURE_STORAGE_INPUT_CONTAINER", "input")
OUTPUT_CONTAINER = os.getenv("AZURE_STORAGE_OUTPUT_CONTAINER", "output")
SUPPORTED_EXTENSIONS = (".pdf", ".jpg", ".jpeg", ".png", ".tiff", ".bmp")
CSV_FIELDS = [
"source",
"vendor",
"invoice_date",
"line_description",
"quantity",
"unit_price",
"line_amount",
"invoice_total",
]
def analyze_invoice(di_client, pdf_bytes):
poller = di_client.begin_analyze_document(
"prebuilt-invoice",
body=pdf_bytes,
content_type="application/octet-stream",
)
return poller.result()
def extract_rows(result, source_name):
rows = []
for doc in result.documents:
fields = doc.fields or {}
vendor_field = fields.get("VendorName")
vendor = vendor_field.value_string if vendor_field else ""
date_field = fields.get("InvoiceDate")
date = str(date_field.value_date) if (date_field and date_field.value_date) else ""
total_field = fields.get("InvoiceTotal")
total = ""
if total_field and total_field.value_currency:
total = str(total_field.value_currency.amount)
items_field = fields.get("Items")
items = items_field.value_array if (items_field and items_field.value_array) else []
if items:
for item in items:
item_fields = item.value_object or {}
desc_field = item_fields.get("Description")
desc = desc_field.value_string if desc_field else ""
qty_field = item_fields.get("Quantity")
qty = str(qty_field.value_number) if (qty_field and qty_field.value_number is not None) else ""
up_field = item_fields.get("UnitPrice")
unit_price = str(up_field.value_currency.amount) if (up_field and up_field.value_currency) else ""
amt_field = item_fields.get("Amount")
amount = str(amt_field.value_currency.amount) if (amt_field and amt_field.value_currency) else ""
rows.append({
"source": source_name,
"vendor": vendor,
"invoice_date": date,
"line_description": desc,
"quantity": qty,
"unit_price": unit_price,
"line_amount": amount,
"invoice_total": total,
})
else:
rows.append({
"source": source_name,
"vendor": vendor,
"invoice_date": date,
"line_description": "",
"quantity": "",
"unit_price": "",
"line_amount": "",
"invoice_total": total,
})
return rows
def main():
di_client = DocumentIntelligenceClient(DI_ENDPOINT, AzureKeyCredential(DI_KEY))
blob_service = BlobServiceClient.from_connection_string(STORAGE_CONN_STR)
input_client = blob_service.get_container_client(INPUT_CONTAINER)
output_client = blob_service.get_container_client(OUTPUT_CONTAINER)
blobs = [b for b in input_client.list_blobs() if b.name.lower().endswith(SUPPORTED_EXTENSIONS)]
if not blobs:
print(f"No invoice files found in container '{INPUT_CONTAINER}'.")
return
all_rows = []
failed = []
for blob in blobs:
print(f"Processing: {blob.name}")
try:
data = input_client.download_blob(blob.name).readall()
result = analyze_invoice(di_client, data)
rows = extract_rows(result, blob.name)
all_rows.extend(rows)
print(f" -> {len(rows)} row(s) extracted")
except Exception as e:
print(f" ERROR: {e}")
failed.append(blob.name)
if failed:
print(f"\nFailed ({len(failed)}): {', '.join(failed)}")
if not all_rows:
print("No data extracted.")
return
csv_buffer = io.StringIO()
writer = csv.DictWriter(csv_buffer, fieldnames=CSV_FIELDS)
writer.writeheader()
writer.writerows(all_rows)
timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
output_blob_name = f"invoices-{timestamp}.csv"
output_client.upload_blob(output_blob_name, csv_buffer.getvalue(), overwrite=True)
print(f"\nResults written to '{OUTPUT_CONTAINER}/{output_blob_name}'")
if __name__ == "__main__":
main()