The GraphQL API limits the size of an operation to 500 KB. To avoid exceeding these limits, use this script for large CSV files.
This script will take an existing CSV file and create smaller CSV files. It also makes sure that redactions on a single piece of content are grouped into the same CSV files.
To use the script, run python split_csv.py <input csv>. It will create a directory containing multiple CSV files that adhere to the API limit for payload size.
1 2#!/usr/bin/env python3 """ Split a large CSV into smaller batch files for bulk redaction. Splits a CSV file containing sensitive content detections into smaller batch files that will each fit within the 500KB GraphQL payload limit. All rows with the same content ARI are kept together in the same batch. Usage: python split_csv.py input.csv Output: Creates a timestamped directory (e.g., csv_batches_2024-01-29_14-30-45/) containing batch1.csv, batch2.csv, etc. """ import argparse import csv import json import sys from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Optional # Maximum size of a GraphQL operation in bytes MAX_OPERATION_SIZE_BYTES = 500 * 1024 # 500KB # Safety margin to account for slight variations SAFETY_MARGIN = 0.95 # Use 95% of max to be safe # Placeholder workspace ID for size estimation (actual ID used at submit time) PLACEHOLDER_WORKSPACE_ID = "ari:cloud:beacon::workspace/00000000-0000-0000-0000-000000000000" @dataclass class ContentLocation: index: int pointer: str @dataclass class RedactionItem: resource_ari: str field_id: str detection: str start: ContentLocation end: ContentLocation id: Optional[str] = None timestamp: Optional[str] = None content_hash: Optional[str] = None def to_graphql_input(self) -> dict: """Convert to GraphQL input format.""" result = { "resourceAri": self.resource_ari, "fieldId": self.field_id, "detection": self.detection, "start": {"index": self.start.index, "pointer": self.start.pointer}, "end": {"index": self.end.index, "pointer": self.end.pointer}, } if self.id: result["id"] = self.id if self.timestamp: result["timestamp"] = self.timestamp if self.content_hash: result["contentHash"] = self.content_hash return result def translate_field_id(field_id: str) -> str: """Translate CSV values to correct field ID for Confluence.""" if field_id == "confluence_page_body": return "body" elif field_id == "confluence_page_title": return "title" elif field_id == "confluence_comment": return "body" elif field_id == "confluence_blog_body": return "body" elif field_id == "confluence_blog_title": return "title" else: return field_id def fix_resource_ari(resource_ari: str) -> str: """Fix the resource ARI to be a valid ARI.""" if "activation" in resource_ari: components = resource_ari.split(":") last_component = components[-1] path_components = last_component.split("/") new_last_component = path_components[0] + "/" + path_components[-1] components[-1] = new_last_component return ":".join(components) else: return resource_ari def parse_csv_row(row: dict) -> RedactionItem: """Parse a CSV row into a RedactionItem.""" return RedactionItem( id=row.get("Finding ID"), resource_ari=fix_resource_ari(row["Content ARI"]), field_id=translate_field_id(row["Sensitive data field"]), detection=row["Detection ID"], start=ContentLocation( index=int(row["Sensitive data location: start position"]), pointer=row["Sensitive data location: first node"], ), end=ContentLocation( index=int(row["Sensitive data location: end position"]), pointer=row["Sensitive data location: last node"], ), timestamp=row.get("Updated at"), content_hash=row.get("Sensitive data hash"), ) def create_mutation_payload(workspace_id: str, redactions: list[dict]) -> dict: """Create the GraphQL mutation payload.""" mutation = """ mutation BulkRedact($input: ShepherdBulkRedactionInput!) { shepherd { redaction { bulkRedact(input: $input) @optIn(to: "ShepherdBulkRedaction") { success errors { message extensions { errorType } } jobId } } } } """ return { "query": mutation, "variables": { "input": { "workspaceId": workspace_id, "redactions": redactions, } } } def load_csv_with_rows(csv_file: Path) -> tuple[list[str], list[dict]]: """Load a CSV file and return (fieldnames, rows as dicts).""" rows = [] fieldnames = None print(f"Loading {csv_file}...") with open(csv_file, "r", encoding="utf-8") as f: reader = csv.DictReader(f) fieldnames = reader.fieldnames for row in reader: rows.append(row) print(f" Loaded {len(rows)} rows") return fieldnames, rows def group_rows_by_content_ari(rows: list[dict]) -> dict[str, list[dict]]: """Group rows by their Content ARI.""" groups: dict[str, list[dict]] = {} for row in rows: content_ari = row.get("Content ARI", "") if content_ari not in groups: groups[content_ari] = [] groups[content_ari].append(row) return groups def estimate_payload_size(rows: list[dict]) -> int: """Estimate the GraphQL payload size for a list of rows.""" items = [] for row in rows: try: item = parse_csv_row(row) items.append(item.to_graphql_input()) except (KeyError, ValueError): # Skip invalid rows in size estimation continue payload = create_mutation_payload(PLACEHOLDER_WORKSPACE_ID, items) return len(json.dumps(payload).encode("utf-8")) def get_base_payload_size() -> int: """Get the base size of an empty mutation payload.""" return estimate_payload_size([]) def split_into_batches( groups: dict[str, list[dict]], max_size: int, ) -> list[list[dict]]: """Split groups into batches that fit within max_size.""" batches = [] current_batch: list[dict] = [] current_size = get_base_payload_size() base_size = current_size for content_ari, group_rows in groups.items(): # Estimate the size this group would add group_size = estimate_payload_size(group_rows) - base_size # Check if this group alone exceeds the limit if base_size + group_size > max_size: print(f" Warning: Content ARI group with {len(group_rows)} rows exceeds size limit") print(f" ARI: {content_ari}") print(f" Size: {(base_size + group_size) / 1024:.1f} KB") # Still add it as its own batch - let the submission script handle the error if current_batch: batches.append(current_batch) current_batch = [] current_size = base_size batches.append(group_rows) continue # Check if adding this group would exceed the limit projected_size = current_size + group_size if current_batch and projected_size > max_size: # Finalize current batch and start a new one batches.append(current_batch) current_batch = list(group_rows) current_size = base_size + group_size else: # Add to current batch current_batch.extend(group_rows) current_size = projected_size # Don't forget the last batch if current_batch: batches.append(current_batch) return batches def write_batch_csv( batch_rows: list[dict], fieldnames: list[str], output_path: Path, ) -> None: """Write a batch of rows to a CSV file.""" with open(output_path, "w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(batch_rows) def main(): parser = argparse.ArgumentParser( description="Split a large CSV into smaller batch files for bulk redaction", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Split a large CSV into batch files python split_csv.py large-file.csv Output: Creates a timestamped directory (e.g., csv_batches_2024-01-29_14-30-45/) containing batch1.csv, batch2.csv, etc. Each batch file will fit within the 500KB GraphQL payload limit. All rows with the same content ARI are kept together in the same batch. """, ) parser.add_argument( "csv_file", type=Path, help="Input CSV file to split", ) parser.add_argument( "--output-dir", type=Path, default=None, help="Base output directory (defaults to same directory as input). A timestamped subdirectory will be created.", ) args = parser.parse_args() # Validate input file if not args.csv_file.is_file(): print(f"Error: {args.csv_file} is not a file") sys.exit(1) # Determine base output directory base_dir = args.output_dir if args.output_dir else args.csv_file.parent # Create timestamped subdirectory timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") output_dir = base_dir / f"csv_batches_{timestamp}" output_dir.mkdir(parents=True, exist_ok=True) # Load CSV fieldnames, rows = load_csv_with_rows(args.csv_file) if not rows: print("No rows to process") sys.exit(0) if not fieldnames: print("Error: Could not read CSV headers") sys.exit(1) # Group by content ARI groups = group_rows_by_content_ari(rows) print(f" Grouped into {len(groups)} unique content ARIs") # Calculate max size with safety margin max_size = int(MAX_OPERATION_SIZE_BYTES * SAFETY_MARGIN) print(f"\nSplitting into batches (max {max_size / 1024:.0f} KB per batch)...") # Split into batches batches = split_into_batches(groups, max_size) if len(batches) == 1: print("\n CSV fits in a single batch - no splitting needed") batch_size = estimate_payload_size(batches[0]) print(f" Batch 1: {len(batches[0])} rows ({batch_size / 1024:.1f} KB)") else: print(f"\n Split into {len(batches)} batches:") for i, batch in enumerate(batches, 1): batch_size = estimate_payload_size(batch) print(f" Batch {i}: {len(batch)} rows ({batch_size / 1024:.1f} KB)") # Write batch files print(f"\nWriting batch files to {output_dir}/") output_files = [] for i, batch in enumerate(batches, 1): output_path = output_dir / f"batch{i}.csv" write_batch_csv(batch, fieldnames, output_path) output_files.append(output_path) print(f" Created {output_path.name} ({len(batch)} rows)") # Summary print(f"\nDone! Created {len(batches)} batch file(s):") for f in output_files: print(f" {f}") if __name__ == "__main__": main()
Rate this page: