The script accepts two commands.
Use the command python bulk_redact.py submit to submit your redactions. Submissions can contain up to 1000 redactions. If successful, a job ID will be returned that can be used to query the status of the job. A job will usually complete in under 10 minutes but jobs with a large number of redactions can take longer. You can’t submit another job while one is in progress.
To query the status of a job, use the command python bulk_redact.py status to query the status of the given job. Keep in mind that jobs will take longer when you submit a higher number of redactions, or if the job produces lots of errors.
To use the redaction script run the following command, remembering to replace the placeholders with your own details.
python bulk_redact.py submit --csv-file <downloaded CSV file> --workspace-id <workspace ARI> --site-url https://<your-site>.atlassian.net --credentials <email>:<API Token>
If a job is successfully created the script will return the job ID.
Some submitted redactions may fail validation, even if a job is created successfully. To help you understand a failed validation, the errors will be printed in the output of the script along with the job ID.
| Name | Description |
|---|---|
csv-file | The CSV file you want to import redactions from. |
workspace-id | The ID of your Guard Detect workspace. See Build your ARI. |
site-url | The URL of your Atlassian site. |
credentials | The credentials to use to make the request. Formatted as The user who generated the token must have access to all the content to be redacted. |
Once your job is submitted you can use the following command to check its status.
python bulk_redact.py status --api-url <your-site>/gateway/api/graphql --manifest <manifest-file> --credentials <email>:<API Token>
| Name | Description |
|---|---|
site-url | The URL of your Atlassian site. |
credentials | The credentials to use to make the request. Formatted as |
1 2#!/usr/bin/env python3 """ Bulk Redaction Script Reads a CSV file containing sensitive content detections and submits them for redaction via the bulkRedact GraphQL mutation. Commands: submit - Submit redaction items from a CSV file status - Check status of a redaction job Usage: # Submit redactions from a CSV file python atlassian_bulk_redaction.py submit --csv-file <file> --workspace-id <workspace_ari> # Check status of a job (copy job ID from submit output) python atlassian_bulk_redaction.py status --workspace-id <workspace_ari> --job-id <job_id> Environment variables: BULK_REDACT_SITE_URL - Atlassian site URL (e.g., https://example.atlassian.net) BULK_REDACT_CREDENTIALS - Basic auth credentials (username:api_token) Note: The entire CSV must fit within a single job (max payload ~500KB). If the CSV is too large, the script will fail with an error. """ import argparse import base64 import csv import json import os import sys from dataclasses import dataclass from pathlib import Path from typing import Optional import urllib.request import urllib.error # Maximum size of a GraphQL operation in bytes MAX_OPERATION_SIZE_BYTES = 500 * 1024 # 500KB # Default error message when none provided DEFAULT_ERROR_MESSAGE = "Unknown error" @dataclass class ContentLocation: index: int pointer: str @dataclass class RedactionItem: resource_ari: str field_id: str detection: str start: ContentLocation end: ContentLocation id: Optional[str] = None timestamp: Optional[str] = None content_hash: Optional[str] = None def to_graphql_input(self) -> dict: """Convert to GraphQL input format.""" result = { "resourceAri": self.resource_ari, "fieldId": self.field_id, "detection": self.detection, "start": {"index": self.start.index, "pointer": self.start.pointer}, "end": {"index": self.end.index, "pointer": self.end.pointer}, } if self.id: result["id"] = self.id if self.timestamp: result["timestamp"] = self.timestamp if self.content_hash: result["contentHash"] = self.content_hash return result def translate_field_id(field_id: str) -> str: """Translate CSV values to correct field ID for Confluence.""" if field_id == "confluence_page_body": return "body" elif field_id == "confluence_page_title": return "title" elif field_id == "confluence_comment": return "body" elif field_id == "confluence_blog_body": return "body" elif field_id == "confluence_blog_title": return "title" else: return field_id def fix_resource_ari(resource_ari: str, data_field: str) -> tuple[str, str]: """Fix the resource ARI to be a valid ARI.""" if "activation" in resource_ari: components = resource_ari.split(":") last_component = components[-1] path_components = last_component.split("/") new_last_component = path_components[0] + "/" + path_components[-1] components[-1] = new_last_component resource_ari = ":".join(components) if "comment" in data_field and "jira" in resource_ari: comment_id = data_field.replace("comment", "") if len(comment_id) > 0: data_field = "comment" resource_ari = resource_ari.replace("issue", "issue-comment") resource_ari = resource_ari + "/" + comment_id return resource_ari, data_field def get_data_column(row: dict, suffix: str, required: bool = True) -> Optional[str]: """Get a column value that may start with 'Data' or 'Sensitive data'. Args: row: The CSV row dictionary suffix: The part after "Data " or "Sensitive data " (e.g., "field", "location: start position") required: If True, raises KeyError when column is not found Returns: The column value, or None if not found and not required """ # Try "Data ..." first data_key = f"Data {suffix}" if data_key in row: return row[data_key] # Try "Sensitive data ..." as fallback sensitive_key = f"Sensitive data {suffix}" if sensitive_key in row: return row[sensitive_key] if required: raise KeyError(f"Column not found: '{data_key}' or '{sensitive_key}'") return None def parse_csv_row(row: dict) -> RedactionItem: """Parse a CSV row into a RedactionItem.""" resource_ari, data_field = fix_resource_ari(row["Content ARI"], get_data_column(row, "field")) return RedactionItem( id=row.get("Finding ID"), resource_ari=resource_ari, field_id=translate_field_id(data_field), detection=row["Detection ID"], start=ContentLocation( index=int(get_data_column(row, "location: start position")), pointer=get_data_column(row, "location: first node"), ), end=ContentLocation( index=int(get_data_column(row, "location: end position")), pointer=get_data_column(row, "location: last node"), ), timestamp=row.get("Updated at"), content_hash=get_data_column(row, "hash", required=False), ) def load_csv_file(csv_file: Path) -> list[RedactionItem]: """Load a CSV file and return redaction items.""" items = [] print(f"Loading {csv_file}...") with open(csv_file, "r", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: try: items.append(parse_csv_row(row)) except (KeyError, ValueError) as e: print(f" Warning: Skipping row due to error: {e}") continue print(f" Loaded {len(items)} items") return items def create_mutation_payload(workspace_id: str, redactions: list[dict]) -> dict: """Create the GraphQL mutation payload.""" mutation = """ mutation BulkRedact($input: ShepherdBulkRedactionInput!) { shepherd { redaction { bulkRedact(input: $input) @optIn(to: "ShepherdBulkRedaction") { success errors { message extensions { errorType } } jobId } } } } """ return { "query": mutation, "variables": { "input": { "workspaceId": workspace_id, "redactions": redactions, } } } def create_status_query_payload(workspace_id: str, job_id: str) -> dict: """Create the GraphQL status query payload.""" query = """ query CheckBulkRedactionStatus($workspaceId: ID!, $jobId: ID!) { shepherd { redaction { checkBulkRedactionStatus(workspaceId: $workspaceId, jobId: $jobId) @optIn(to: "ShepherdBulkRedaction") { jobStatus totalRedactions completedRedactions redactionErrors { id status errors } } } } } """ return { "query": query, "variables": { "workspaceId": workspace_id, "jobId": job_id, } } def execute_graphql(api_url: str, credentials: str, payload: dict) -> dict: """Execute a GraphQL request using Basic auth.""" data = json.dumps(payload).encode("utf-8") # Encode credentials for Basic auth encoded_credentials = base64.b64encode(credentials.encode("utf-8")).decode("utf-8") request = urllib.request.Request( api_url, data=data, headers={ "Content-Type": "application/json", "Authorization": f"Basic {encoded_credentials}", }, method="POST" ) try: with urllib.request.urlopen(request, timeout=60) as response: return json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") if e.fp else "" raise RuntimeError(f"HTTP {e.code}: {e.reason}\n{error_body}") except urllib.error.URLError as e: raise RuntimeError(f"URL Error: {e.reason}") def submit_redactions( api_url: str, credentials: str, workspace_id: str, items: list[RedactionItem], verbose: bool = False, ) -> Optional[str]: """Submit items for redaction. Returns job_id if successful.""" print(f"\nSubmitting {len(items)} items for redaction...") redactions = [item.to_graphql_input() for item in items] payload = create_mutation_payload(workspace_id, redactions) if verbose: print("Payload items:") print(json.dumps(redactions, indent=2)) try: result = execute_graphql(api_url, credentials, payload) except RuntimeError as e: print(f"ERROR: Request failed: {e}") return None bulk_redact_result = result.get("data", {}).get("shepherd", {}).get("redaction", {}).get("bulkRedact", {}) errors = bulk_redact_result.get("errors", []) if not bulk_redact_result.get("success"): print("Mutation failed!") if errors: print("\nErrors returned from API:") for i, error in enumerate(errors, 1): message = error.get("message", DEFAULT_ERROR_MESSAGE) extensions = error.get("extensions", {}) error_type = extensions.get("errorType", "UNKNOWN") print(f" {i}. [{error_type}] {message}") else: print(" No detailed error information available") return None if (errors): print("Job created, but there were some errors") for i, error in enumerate(errors, 1): message = error.get("message", DEFAULT_ERROR_MESSAGE) extensions = error.get("extensions", {}) error_type = extensions.get("errorType", "UNKNOWN") print(f" {i}. [{error_type}] {message}") else: print("Mutation succeeded with no errors") job_id = bulk_redact_result.get("jobId") redaction_ids = bulk_redact_result.get("redactionIds", []) if (len(redaction_ids) > 0): print(f"Redaction IDs: {', '.join(redaction_ids)}") print(f"Success! Job ID: {job_id}") return job_id def check_job_status_once( api_url: str, credentials: str, workspace_id: str, job_id: str, ) -> tuple[str, bool, int, int]: """Check job status once (no polling). Returns (status, success, completed, total).""" print(f"\nChecking status for job {job_id}...") payload = create_status_query_payload(workspace_id, job_id) try: result = execute_graphql(api_url, credentials, payload) except RuntimeError as e: print(f" Error: {e}") return "ERROR", False, 0, 0 if "errors" in result: print(" Request failed with GraphQL errors:") for i, error in enumerate(result["errors"], 1): message = error.get("message", DEFAULT_ERROR_MESSAGE) path = error.get("path", []) print(f" {i}. {message}") if path: print(f" Path: {' > '.join(str(p) for p in path)}") return "ERROR", False, 0, 0 status_result = ( result.get("data", {}) .get("shepherd", {}) .get("redaction", {}) .get("checkBulkRedactionStatus", {}) ) job_status = status_result.get("jobStatus", "UNKNOWN") total_redactions = status_result.get("totalRedactions", 0) completed_redactions = status_result.get("completedRedactions", 0) redaction_errors = status_result.get("redactionErrors", []) progress = f"{completed_redactions}/{total_redactions}" if total_redactions else "0/?" print(f" Status: {job_status} ({progress} redactions completed)") if redaction_errors: failed = [e for e in redaction_errors if e.get("status") == "FAILED"] if failed: print(f" Failed items: {len(failed)}") for error in failed: print(f" - {error.get('id')}: {error.get('errors')}") success = job_status in ("SUCCEEDED", "PARTIALLY_SUCCEEDED") return job_status, success, completed_redactions, total_redactions def run_status(args) -> None: """Run the status check command.""" if not args.site_url: print("Error: --site-url or BULK_REDACT_SITE_URL environment variable is required") sys.exit(1) if not args.credentials: print("Error: --credentials or BULK_REDACT_CREDENTIALS environment variable is required") sys.exit(1) # Build the full API URL from the site URL api_url = args.site_url.rstrip("/") + "/gateway/api/graphql" status, success, _, _ = check_job_status_once( api_url, args.credentials, args.workspace_id, args.job_id, ) # Exit with appropriate code if success: sys.exit(0) elif status == "IN_PROGRESS": sys.exit(0) # Still running is not an error else: sys.exit(1) def run_submit(args) -> None: """Run the submit command.""" # Validate required args (unless dry run) if not args.dry_run: if not args.site_url: print("Error: --site-url or BULK_REDACT_SITE_URL environment variable is required") sys.exit(1) if not args.credentials: print("Error: --credentials or BULK_REDACT_CREDENTIALS environment variable is required") sys.exit(1) # Build the full API URL from the site URL api_url = args.site_url.rstrip("/") + "/gateway/api/graphql" if args.site_url else None # Load items from CSV if not args.csv_file.is_file(): print(f"Error: {args.csv_file} is not a file") sys.exit(1) items = load_csv_file(args.csv_file) if not items: print("No items to process") sys.exit(0) print(f"\nTotal items to redact: {len(items)}") # Build payload and check size redactions = [item.to_graphql_input() for item in items] payload = create_mutation_payload(args.workspace_id, redactions) payload_size = len(json.dumps(payload).encode("utf-8")) max_size_kb = MAX_OPERATION_SIZE_BYTES / 1024 payload_size_kb = payload_size / 1024 print(f"Payload size: {payload_size_kb:.1f} KB (max: {max_size_kb:.0f} KB)") # Check if payload exceeds maximum size if payload_size > MAX_OPERATION_SIZE_BYTES: print(f"\nERROR: Payload size ({payload_size_kb:.1f} KB) exceeds maximum allowed ({max_size_kb:.0f} KB)") print(f"The CSV contains {len(items)} items which is too large for a single job.") print("Please reduce the number of items in your CSV file and try again.") sys.exit(1) if args.dry_run: print("\nDry run - not submitting request") if args.verbose: print("Payload items:") print(json.dumps(redactions, indent=2)) sys.exit(0) # Submit the job job_id = submit_redactions( api_url, args.credentials, args.workspace_id, items, verbose=args.verbose, ) if job_id: print(f"\nCopy this Job ID to check status later: {job_id}") sys.exit(0) else: sys.exit(1) def main(): parser = argparse.ArgumentParser( description="Bulk redaction script for sensitive content", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Environment variables: BULK_REDACT_SITE_URL Atlassian site URL (e.g., https://example.atlassian.net) BULK_REDACT_CREDENTIALS Basic auth credentials (username:api_token) Examples: # Submit redactions from a CSV file python atlassian_bulk_redaction.py submit --csv-file ./data.csv --workspace-id "ari:cloud:..." # Check status of a job (use job ID from submit output) python atlassian_bulk_redaction.py status --workspace-id "ari:cloud:..." --job-id "abc123" """, ) subparsers = parser.add_subparsers(dest="command", help="Available commands") # Common arguments for both commands common_parser = argparse.ArgumentParser(add_help=False) common_parser.add_argument( "--site-url", default=os.environ.get("BULK_REDACT_SITE_URL"), help="Atlassian site URL, e.g., https://example.atlassian.net (or set BULK_REDACT_SITE_URL env var)", ) common_parser.add_argument( "--credentials", default=os.environ.get("BULK_REDACT_CREDENTIALS"), help="Basic auth credentials as username:api_token (or set BULK_REDACT_CREDENTIALS env var)", ) # Submit command submit_parser = subparsers.add_parser( "submit", parents=[common_parser], help="Submit redaction items from a CSV file", formatter_class=argparse.RawDescriptionHelpFormatter, ) submit_parser.add_argument( "--csv-file", type=Path, required=True, help="CSV file to process", ) submit_parser.add_argument( "--workspace-id", required=True, help="Workspace ARI (e.g., ari:cloud:beacon::workspace/...)", ) submit_parser.add_argument( "--dry-run", action="store_true", help="Check payload size without submitting", ) submit_parser.add_argument( "-v", "--verbose", action="store_true", help="Print the items being sent in the payload (pretty printed)", ) # Status command status_parser = subparsers.add_parser( "status", parents=[common_parser], help="Check status of a redaction job", formatter_class=argparse.RawDescriptionHelpFormatter, ) status_parser.add_argument( "--workspace-id", required=True, help="Workspace ARI", ) status_parser.add_argument( "--job-id", required=True, help="Job ID to check (from submit command output)", ) args = parser.parse_args() if args.command is None: parser.print_help() sys.exit(1) if args.command == "submit": run_submit(args) elif args.command == "status": run_status(args) if __name__ == "__main__": main()
Rate this page: