Developer
News and Updates
Get Support
Sign in
Get Support
Sign in
DOCUMENTATION
Cloud
Data Center
Resources
Sign in
Sign in
DOCUMENTATION
Cloud
Data Center
Resources
Sign in
Last updated Jan 30, 2026

Preparing your data for submission

The GraphQL API limits the size of an operation to 500 KB. To avoid exceeding these limits, use this script for large CSV files.

This script will take an existing CSV file and create smaller CSV files. It also makes sure that redactions on a single piece of content are grouped into the same CSV files.

To use the script, run python split_csv.py <input csv>. It will create a directory containing multiple CSV files that adhere to the API limit for payload size.

Script

1
2
#!/usr/bin/env python3
"""
Split a large CSV into smaller batch files for bulk redaction.

Splits a CSV file containing sensitive content detections into smaller batch
files that will each fit within the 500KB GraphQL payload limit. All rows
with the same content ARI are kept together in the same batch.

Usage:
    python split_csv.py input.csv

Output:
    Creates a timestamped directory (e.g., csv_batches_2024-01-29_14-30-45/) containing
    batch1.csv, batch2.csv, etc.
"""

import argparse
import csv
import json
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional

# Maximum size of a GraphQL operation in bytes
MAX_OPERATION_SIZE_BYTES = 500 * 1024  # 500KB

# Safety margin to account for slight variations
SAFETY_MARGIN = 0.95  # Use 95% of max to be safe

# Placeholder workspace ID for size estimation (actual ID used at submit time)
PLACEHOLDER_WORKSPACE_ID = "ari:cloud:beacon::workspace/00000000-0000-0000-0000-000000000000"


@dataclass
class ContentLocation:
    index: int
    pointer: str


@dataclass
class RedactionItem:
    resource_ari: str
    field_id: str
    detection: str
    start: ContentLocation
    end: ContentLocation
    id: Optional[str] = None
    timestamp: Optional[str] = None
    content_hash: Optional[str] = None

    def to_graphql_input(self) -> dict:
        """Convert to GraphQL input format."""
        result = {
            "resourceAri": self.resource_ari,
            "fieldId": self.field_id,
            "detection": self.detection,
            "start": {"index": self.start.index, "pointer": self.start.pointer},
            "end": {"index": self.end.index, "pointer": self.end.pointer},
        }
        if self.id:
            result["id"] = self.id
        if self.timestamp:
            result["timestamp"] = self.timestamp
        if self.content_hash:
            result["contentHash"] = self.content_hash
        return result


def translate_field_id(field_id: str) -> str:
    """Translate CSV values to correct field ID for Confluence."""
    if field_id == "confluence_page_body":
        return "body"
    elif field_id == "confluence_page_title":
        return "title"
    elif field_id == "confluence_comment":
        return "body"
    elif field_id == "confluence_blog_body":
        return "body"
    elif field_id == "confluence_blog_title":
        return "title"
    else:
        return field_id


def fix_resource_ari(resource_ari: str) -> str:
    """Fix the resource ARI to be a valid ARI."""
    if "activation" in resource_ari:
        components = resource_ari.split(":")
        last_component = components[-1]
        path_components = last_component.split("/")
        new_last_component = path_components[0] + "/" + path_components[-1]
        components[-1] = new_last_component
        return ":".join(components)
    else:
        return resource_ari


def parse_csv_row(row: dict) -> RedactionItem:
    """Parse a CSV row into a RedactionItem."""
    return RedactionItem(
        id=row.get("Finding ID"),
        resource_ari=fix_resource_ari(row["Content ARI"]),
        field_id=translate_field_id(row["Sensitive data field"]),
        detection=row["Detection ID"],
        start=ContentLocation(
            index=int(row["Sensitive data location: start position"]),
            pointer=row["Sensitive data location: first node"],
        ),
        end=ContentLocation(
            index=int(row["Sensitive data location: end position"]),
            pointer=row["Sensitive data location: last node"],
        ),
        timestamp=row.get("Updated at"),
        content_hash=row.get("Sensitive data hash"),
    )


def create_mutation_payload(workspace_id: str, redactions: list[dict]) -> dict:
    """Create the GraphQL mutation payload."""
    mutation = """
mutation BulkRedact($input: ShepherdBulkRedactionInput!) {
  shepherd {
    redaction {
      bulkRedact(input: $input) @optIn(to: "ShepherdBulkRedaction") {
        success
        errors {
          message
          extensions {
            errorType
          }
        }
        jobId
      }
    }
  }
}
"""
    return {
        "query": mutation,
        "variables": {
            "input": {
                "workspaceId": workspace_id,
                "redactions": redactions,
            }
        }
    }


def load_csv_with_rows(csv_file: Path) -> tuple[list[str], list[dict]]:
    """Load a CSV file and return (fieldnames, rows as dicts)."""
    rows = []
    fieldnames = None
    print(f"Loading {csv_file}...")
    with open(csv_file, "r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        fieldnames = reader.fieldnames
        for row in reader:
            rows.append(row)
    print(f"  Loaded {len(rows)} rows")
    return fieldnames, rows


def group_rows_by_content_ari(rows: list[dict]) -> dict[str, list[dict]]:
    """Group rows by their Content ARI."""
    groups: dict[str, list[dict]] = {}
    for row in rows:
        content_ari = row.get("Content ARI", "")
        if content_ari not in groups:
            groups[content_ari] = []
        groups[content_ari].append(row)
    return groups


def estimate_payload_size(rows: list[dict]) -> int:
    """Estimate the GraphQL payload size for a list of rows."""
    items = []
    for row in rows:
        try:
            item = parse_csv_row(row)
            items.append(item.to_graphql_input())
        except (KeyError, ValueError):
            # Skip invalid rows in size estimation
            continue

    payload = create_mutation_payload(PLACEHOLDER_WORKSPACE_ID, items)
    return len(json.dumps(payload).encode("utf-8"))


def get_base_payload_size() -> int:
    """Get the base size of an empty mutation payload."""
    return estimate_payload_size([])


def split_into_batches(
    groups: dict[str, list[dict]],
    max_size: int,
) -> list[list[dict]]:
    """Split groups into batches that fit within max_size."""
    batches = []
    current_batch: list[dict] = []
    current_size = get_base_payload_size()
    base_size = current_size

    for content_ari, group_rows in groups.items():
        # Estimate the size this group would add
        group_size = estimate_payload_size(group_rows) - base_size

        # Check if this group alone exceeds the limit
        if base_size + group_size > max_size:
            print(f"  Warning: Content ARI group with {len(group_rows)} rows exceeds size limit")
            print(f"    ARI: {content_ari}")
            print(f"    Size: {(base_size + group_size) / 1024:.1f} KB")
            # Still add it as its own batch - let the submission script handle the error
            if current_batch:
                batches.append(current_batch)
                current_batch = []
                current_size = base_size
            batches.append(group_rows)
            continue

        # Check if adding this group would exceed the limit
        projected_size = current_size + group_size
        if current_batch and projected_size > max_size:
            # Finalize current batch and start a new one
            batches.append(current_batch)
            current_batch = list(group_rows)
            current_size = base_size + group_size
        else:
            # Add to current batch
            current_batch.extend(group_rows)
            current_size = projected_size

    # Don't forget the last batch
    if current_batch:
        batches.append(current_batch)

    return batches


def write_batch_csv(
    batch_rows: list[dict],
    fieldnames: list[str],
    output_path: Path,
) -> None:
    """Write a batch of rows to a CSV file."""
    with open(output_path, "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(batch_rows)


def main():
    parser = argparse.ArgumentParser(
        description="Split a large CSV into smaller batch files for bulk redaction",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Split a large CSV into batch files
  python split_csv.py large-file.csv

Output:
  Creates a timestamped directory (e.g., csv_batches_2024-01-29_14-30-45/) containing
  batch1.csv, batch2.csv, etc. Each batch file will fit within the 500KB
  GraphQL payload limit. All rows with the same content ARI are kept together
  in the same batch.
        """,
    )

    parser.add_argument(
        "csv_file",
        type=Path,
        help="Input CSV file to split",
    )
    parser.add_argument(
        "--output-dir",
        type=Path,
        default=None,
        help="Base output directory (defaults to same directory as input). A timestamped subdirectory will be created.",
    )

    args = parser.parse_args()

    # Validate input file
    if not args.csv_file.is_file():
        print(f"Error: {args.csv_file} is not a file")
        sys.exit(1)

    # Determine base output directory
    base_dir = args.output_dir if args.output_dir else args.csv_file.parent

    # Create timestamped subdirectory
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    output_dir = base_dir / f"csv_batches_{timestamp}"
    output_dir.mkdir(parents=True, exist_ok=True)

    # Load CSV
    fieldnames, rows = load_csv_with_rows(args.csv_file)

    if not rows:
        print("No rows to process")
        sys.exit(0)

    if not fieldnames:
        print("Error: Could not read CSV headers")
        sys.exit(1)

    # Group by content ARI
    groups = group_rows_by_content_ari(rows)
    print(f"  Grouped into {len(groups)} unique content ARIs")

    # Calculate max size with safety margin
    max_size = int(MAX_OPERATION_SIZE_BYTES * SAFETY_MARGIN)
    print(f"\nSplitting into batches (max {max_size / 1024:.0f} KB per batch)...")

    # Split into batches
    batches = split_into_batches(groups, max_size)

    if len(batches) == 1:
        print("\n  CSV fits in a single batch - no splitting needed")
        batch_size = estimate_payload_size(batches[0])
        print(f"  Batch 1: {len(batches[0])} rows ({batch_size / 1024:.1f} KB)")
    else:
        print(f"\n  Split into {len(batches)} batches:")
        for i, batch in enumerate(batches, 1):
            batch_size = estimate_payload_size(batch)
            print(f"  Batch {i}: {len(batch)} rows ({batch_size / 1024:.1f} KB)")

    # Write batch files
    print(f"\nWriting batch files to {output_dir}/")
    output_files = []
    for i, batch in enumerate(batches, 1):
        output_path = output_dir / f"batch{i}.csv"
        write_batch_csv(batch, fieldnames, output_path)
        output_files.append(output_path)
        print(f"  Created {output_path.name} ({len(batch)} rows)")

    # Summary
    print(f"\nDone! Created {len(batches)} batch file(s):")
    for f in output_files:
        print(f"  {f}")


if __name__ == "__main__":
    main()

Rate this page: