Developer
News and Updates
Get Support
Sign in
Get Support
Sign in
DOCUMENTATION
Cloud
Data Center
Resources
Sign in
Sign in
DOCUMENTATION
Cloud
Data Center
Resources
Sign in
Last updated Jan 30, 2026

Using the bulk redaction example script

The script accepts two commands.

Use the command python bulk_redact.py submit to submit your redactions. Submissions can contain up to 1000 redactions. If successful, a job ID will be returned that can be used to query the status of the job. A job will usually complete in under 10 minutes but jobs with a large number of redactions can take longer. You can’t submit another job while one is in progress.

To query the status of a job, use the command python bulk_redact.py status to query the status of the given job. Keep in mind that jobs will take longer when you submit a higher number of redactions, or if the job produces lots of errors.

Submit the contents of a CSV

To use the redaction script run the following command, remembering to replace the placeholders with your own details.

python bulk_redact.py submit --csv-file <downloaded CSV file> --workspace-id <workspace ARI> --site-url https://<your-site>.atlassian.net --credentials <email>:<API Token>

If a job is successfully created the script will return the job ID.

Some submitted redactions may fail validation, even if a job is created successfully. To help you understand a failed validation, the errors will be printed in the output of the script along with the job ID.

Arguments

NameDescription
csv-fileThe CSV file you want to import redactions from.
workspace-idThe ID of your Guard Detect workspace. See Build your ARI.
site-urlThe URL of your Atlassian site.
credentials

The credentials to use to make the request. Formatted as <email>:<API Token>

The user who generated the token must have access to all the content to be redacted.

Check the status of submitted jobs

Once your job is submitted you can use the following command to check its status.

python bulk_redact.py status --api-url <your-site>/gateway/api/graphql --manifest <manifest-file> --credentials <email>:<API Token>

Arguments

NameDescription
site-urlThe URL of your Atlassian site.
credentials

The credentials to use to make the request. Formatted as <email>:<API Token>

Script

1
2
#!/usr/bin/env python3
"""
Bulk Redaction Script

Reads a CSV file containing sensitive content detections and submits them
for redaction via the bulkRedact GraphQL mutation.

Commands:
    submit  - Submit redaction items from a CSV file
    status  - Check status of a redaction job

Usage:
    # Submit redactions from a CSV file
    python atlassian_bulk_redaction.py submit --csv-file <file> --workspace-id <workspace_ari>

    # Check status of a job (copy job ID from submit output)
    python atlassian_bulk_redaction.py status --workspace-id <workspace_ari> --job-id <job_id>

Environment variables:
    BULK_REDACT_SITE_URL - Atlassian site URL (e.g., https://example.atlassian.net)
    BULK_REDACT_CREDENTIALS - Basic auth credentials (username:api_token)

Note:
    The entire CSV must fit within a single job (max payload ~500KB).
    If the CSV is too large, the script will fail with an error.
"""

import argparse
import base64
import csv
import json
import os
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
import urllib.request
import urllib.error

# Maximum size of a GraphQL operation in bytes
MAX_OPERATION_SIZE_BYTES = 500 * 1024  # 500KB

# Default error message when none provided
DEFAULT_ERROR_MESSAGE = "Unknown error"


@dataclass
class ContentLocation:
    index: int
    pointer: str


@dataclass
class RedactionItem:
    resource_ari: str
    field_id: str
    detection: str
    start: ContentLocation
    end: ContentLocation
    id: Optional[str] = None
    timestamp: Optional[str] = None
    content_hash: Optional[str] = None

    def to_graphql_input(self) -> dict:
        """Convert to GraphQL input format."""
        result = {
            "resourceAri": self.resource_ari,
            "fieldId": self.field_id,
            "detection": self.detection,
            "start": {"index": self.start.index, "pointer": self.start.pointer},
            "end": {"index": self.end.index, "pointer": self.end.pointer},
        }
        if self.id:
            result["id"] = self.id
        if self.timestamp:
            result["timestamp"] = self.timestamp
        if self.content_hash:
            result["contentHash"] = self.content_hash
        return result


def translate_field_id(field_id: str) -> str:
    """Translate CSV values to correct field ID for Confluence."""
    if field_id == "confluence_page_body":
        return "body"
    elif field_id == "confluence_page_title":
        return "title"
    elif field_id == "confluence_comment":
        return "body"
    elif field_id == "confluence_blog_body":
        return "body"
    elif field_id == "confluence_blog_title":
        return "title"
    else:
        return field_id


def fix_resource_ari(resource_ari: str, data_field: str) -> tuple[str, str]:
    """Fix the resource ARI to be a valid ARI."""
    if "activation" in resource_ari:
        components = resource_ari.split(":")
        last_component = components[-1]
        path_components = last_component.split("/")
        new_last_component = path_components[0] + "/" + path_components[-1]
        components[-1] = new_last_component
        resource_ari = ":".join(components)
    if "comment" in data_field and "jira" in resource_ari:
        comment_id = data_field.replace("comment", "")
        if len(comment_id) > 0:
            data_field = "comment"
            resource_ari = resource_ari.replace("issue", "issue-comment")
            resource_ari = resource_ari + "/" + comment_id

    return resource_ari, data_field


def get_data_column(row: dict, suffix: str, required: bool = True) -> Optional[str]:
    """Get a column value that may start with 'Data' or 'Sensitive data'.

    Args:
        row: The CSV row dictionary
        suffix: The part after "Data " or "Sensitive data " (e.g., "field", "location: start position")
        required: If True, raises KeyError when column is not found

    Returns:
        The column value, or None if not found and not required
    """
    # Try "Data ..." first
    data_key = f"Data {suffix}"
    if data_key in row:
        return row[data_key]

    # Try "Sensitive data ..." as fallback
    sensitive_key = f"Sensitive data {suffix}"
    if sensitive_key in row:
        return row[sensitive_key]

    if required:
        raise KeyError(f"Column not found: '{data_key}' or '{sensitive_key}'")
    return None


def parse_csv_row(row: dict) -> RedactionItem:
    """Parse a CSV row into a RedactionItem."""

    resource_ari, data_field = fix_resource_ari(row["Content ARI"], get_data_column(row, "field"))

    return RedactionItem(
        id=row.get("Finding ID"),
        resource_ari=resource_ari,
        field_id=translate_field_id(data_field),
        detection=row["Detection ID"],
        start=ContentLocation(
            index=int(get_data_column(row, "location: start position")),
            pointer=get_data_column(row, "location: first node"),
        ),
        end=ContentLocation(
            index=int(get_data_column(row, "location: end position")),
            pointer=get_data_column(row, "location: last node"),
        ),
        timestamp=row.get("Updated at"),
        content_hash=get_data_column(row, "hash", required=False),
    )


def load_csv_file(csv_file: Path) -> list[RedactionItem]:
    """Load a CSV file and return redaction items."""
    items = []
    print(f"Loading {csv_file}...")
    with open(csv_file, "r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            try:
                items.append(parse_csv_row(row))
            except (KeyError, ValueError) as e:
                print(f"  Warning: Skipping row due to error: {e}")
                continue
    print(f"  Loaded {len(items)} items")
    return items


def create_mutation_payload(workspace_id: str, redactions: list[dict]) -> dict:
    """Create the GraphQL mutation payload."""
    mutation = """
mutation BulkRedact($input: ShepherdBulkRedactionInput!) {
  shepherd {
    redaction {
      bulkRedact(input: $input) @optIn(to: "ShepherdBulkRedaction") {
        success
        errors {
          message
          extensions {
            errorType
          }
        }
        jobId
      }
    }
  }
}
"""
    return {
        "query": mutation,
        "variables": {
            "input": {
                "workspaceId": workspace_id,
                "redactions": redactions,
            }
        }
    }


def create_status_query_payload(workspace_id: str, job_id: str) -> dict:
    """Create the GraphQL status query payload."""
    query = """
query CheckBulkRedactionStatus($workspaceId: ID!, $jobId: ID!) {
  shepherd {
    redaction {
      checkBulkRedactionStatus(workspaceId: $workspaceId, jobId: $jobId) @optIn(to: "ShepherdBulkRedaction") {
        jobStatus
        totalRedactions
        completedRedactions
        redactionErrors {
          id
          status
          errors
        }
      }
    }
  }
}
"""
    return {
        "query": query,
        "variables": {
            "workspaceId": workspace_id,
            "jobId": job_id,
        }
    }


def execute_graphql(api_url: str, credentials: str, payload: dict) -> dict:
    """Execute a GraphQL request using Basic auth."""
    data = json.dumps(payload).encode("utf-8")

    # Encode credentials for Basic auth
    encoded_credentials = base64.b64encode(credentials.encode("utf-8")).decode("utf-8")

    request = urllib.request.Request(
        api_url,
        data=data,
        headers={
            "Content-Type": "application/json",
            "Authorization": f"Basic {encoded_credentials}",
        },
        method="POST"
    )

    try:
        with urllib.request.urlopen(request, timeout=60) as response:
            return json.loads(response.read().decode("utf-8"))
    except urllib.error.HTTPError as e:
        error_body = e.read().decode("utf-8") if e.fp else ""
        raise RuntimeError(f"HTTP {e.code}: {e.reason}\n{error_body}")
    except urllib.error.URLError as e:
        raise RuntimeError(f"URL Error: {e.reason}")


def submit_redactions(
    api_url: str,
    credentials: str,
    workspace_id: str,
    items: list[RedactionItem],
    verbose: bool = False,
) -> Optional[str]:
    """Submit items for redaction. Returns job_id if successful."""
    print(f"\nSubmitting {len(items)} items for redaction...")

    redactions = [item.to_graphql_input() for item in items]
    payload = create_mutation_payload(workspace_id, redactions)

    if verbose:
        print("Payload items:")
        print(json.dumps(redactions, indent=2))

    try:
        result = execute_graphql(api_url, credentials, payload)
    except RuntimeError as e:
        print(f"ERROR: Request failed: {e}")
        return None


    bulk_redact_result = result.get("data", {}).get("shepherd", {}).get("redaction", {}).get("bulkRedact", {})
    errors = bulk_redact_result.get("errors", [])
    if not bulk_redact_result.get("success"):
        print("Mutation failed!")
        if errors:
            print("\nErrors returned from API:")
            for i, error in enumerate(errors, 1):
                message = error.get("message", DEFAULT_ERROR_MESSAGE)
                extensions = error.get("extensions", {})
                error_type = extensions.get("errorType", "UNKNOWN")
                print(f"  {i}. [{error_type}] {message}")
        else:
            print("  No detailed error information available")
        return None

    if (errors):
        print("Job created, but there were some errors")
        for i, error in enumerate(errors, 1):
            message = error.get("message", DEFAULT_ERROR_MESSAGE)
            extensions = error.get("extensions", {})
            error_type = extensions.get("errorType", "UNKNOWN")
            print(f"  {i}. [{error_type}] {message}")
    else:
        print("Mutation succeeded with no errors")

    job_id = bulk_redact_result.get("jobId")
    redaction_ids = bulk_redact_result.get("redactionIds", [])
    if (len(redaction_ids) > 0):
        print(f"Redaction IDs: {', '.join(redaction_ids)}")
    print(f"Success! Job ID: {job_id}")

    return job_id


def check_job_status_once(
    api_url: str,
    credentials: str,
    workspace_id: str,
    job_id: str,
) -> tuple[str, bool, int, int]:
    """Check job status once (no polling). Returns (status, success, completed, total)."""
    print(f"\nChecking status for job {job_id}...")

    payload = create_status_query_payload(workspace_id, job_id)

    try:
        result = execute_graphql(api_url, credentials, payload)
    except RuntimeError as e:
        print(f"  Error: {e}")
        return "ERROR", False, 0, 0

    if "errors" in result:
        print("  Request failed with GraphQL errors:")
        for i, error in enumerate(result["errors"], 1):
            message = error.get("message", DEFAULT_ERROR_MESSAGE)
            path = error.get("path", [])
            print(f"    {i}. {message}")
            if path:
                print(f"       Path: {' > '.join(str(p) for p in path)}")
        return "ERROR", False, 0, 0

    status_result = (
        result.get("data", {})
        .get("shepherd", {})
        .get("redaction", {})
        .get("checkBulkRedactionStatus", {})
    )

    job_status = status_result.get("jobStatus", "UNKNOWN")
    total_redactions = status_result.get("totalRedactions", 0)
    completed_redactions = status_result.get("completedRedactions", 0)
    redaction_errors = status_result.get("redactionErrors", [])

    progress = f"{completed_redactions}/{total_redactions}" if total_redactions else "0/?"
    print(f"  Status: {job_status} ({progress} redactions completed)")

    if redaction_errors:
        failed = [e for e in redaction_errors if e.get("status") == "FAILED"]
        if failed:
            print(f"  Failed items: {len(failed)}")
            for error in failed:
                print(f"    - {error.get('id')}: {error.get('errors')}")

    success = job_status in ("SUCCEEDED", "PARTIALLY_SUCCEEDED")
    return job_status, success, completed_redactions, total_redactions


def run_status(args) -> None:
    """Run the status check command."""
    if not args.site_url:
        print("Error: --site-url or BULK_REDACT_SITE_URL environment variable is required")
        sys.exit(1)
    if not args.credentials:
        print("Error: --credentials or BULK_REDACT_CREDENTIALS environment variable is required")
        sys.exit(1)

    # Build the full API URL from the site URL
    api_url = args.site_url.rstrip("/") + "/gateway/api/graphql"

    status, success, _, _ = check_job_status_once(
        api_url,
        args.credentials,
        args.workspace_id,
        args.job_id,
    )

    # Exit with appropriate code
    if success:
        sys.exit(0)
    elif status == "IN_PROGRESS":
        sys.exit(0)  # Still running is not an error
    else:
        sys.exit(1)


def run_submit(args) -> None:
    """Run the submit command."""
    # Validate required args (unless dry run)
    if not args.dry_run:
        if not args.site_url:
            print("Error: --site-url or BULK_REDACT_SITE_URL environment variable is required")
            sys.exit(1)
        if not args.credentials:
            print("Error: --credentials or BULK_REDACT_CREDENTIALS environment variable is required")
            sys.exit(1)

    # Build the full API URL from the site URL
    api_url = args.site_url.rstrip("/") + "/gateway/api/graphql" if args.site_url else None

    # Load items from CSV
    if not args.csv_file.is_file():
        print(f"Error: {args.csv_file} is not a file")
        sys.exit(1)
    items = load_csv_file(args.csv_file)

    if not items:
        print("No items to process")
        sys.exit(0)

    print(f"\nTotal items to redact: {len(items)}")

    # Build payload and check size
    redactions = [item.to_graphql_input() for item in items]
    payload = create_mutation_payload(args.workspace_id, redactions)
    payload_size = len(json.dumps(payload).encode("utf-8"))
    max_size_kb = MAX_OPERATION_SIZE_BYTES / 1024
    payload_size_kb = payload_size / 1024

    print(f"Payload size: {payload_size_kb:.1f} KB (max: {max_size_kb:.0f} KB)")

    # Check if payload exceeds maximum size
    if payload_size > MAX_OPERATION_SIZE_BYTES:
        print(f"\nERROR: Payload size ({payload_size_kb:.1f} KB) exceeds maximum allowed ({max_size_kb:.0f} KB)")
        print(f"The CSV contains {len(items)} items which is too large for a single job.")
        print("Please reduce the number of items in your CSV file and try again.")
        sys.exit(1)

    if args.dry_run:
        print("\nDry run - not submitting request")
        if args.verbose:
            print("Payload items:")
            print(json.dumps(redactions, indent=2))
        sys.exit(0)

    # Submit the job
    job_id = submit_redactions(
        api_url,
        args.credentials,
        args.workspace_id,
        items,
        verbose=args.verbose,
    )

    if job_id:
        print(f"\nCopy this Job ID to check status later: {job_id}")
        sys.exit(0)
    else:
        sys.exit(1)


def main():
    parser = argparse.ArgumentParser(
        description="Bulk redaction script for sensitive content",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Environment variables:
  BULK_REDACT_SITE_URL       Atlassian site URL (e.g., https://example.atlassian.net)
  BULK_REDACT_CREDENTIALS    Basic auth credentials (username:api_token)

Examples:
  # Submit redactions from a CSV file
  python atlassian_bulk_redaction.py submit --csv-file ./data.csv --workspace-id "ari:cloud:..."

  # Check status of a job (use job ID from submit output)
  python atlassian_bulk_redaction.py status --workspace-id "ari:cloud:..." --job-id "abc123"
        """,
    )

    subparsers = parser.add_subparsers(dest="command", help="Available commands")

    # Common arguments for both commands
    common_parser = argparse.ArgumentParser(add_help=False)
    common_parser.add_argument(
        "--site-url",
        default=os.environ.get("BULK_REDACT_SITE_URL"),
        help="Atlassian site URL, e.g., https://example.atlassian.net (or set BULK_REDACT_SITE_URL env var)",
    )
    common_parser.add_argument(
        "--credentials",
        default=os.environ.get("BULK_REDACT_CREDENTIALS"),
        help="Basic auth credentials as username:api_token (or set BULK_REDACT_CREDENTIALS env var)",
    )

    # Submit command
    submit_parser = subparsers.add_parser(
        "submit",
        parents=[common_parser],
        help="Submit redaction items from a CSV file",
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )

    submit_parser.add_argument(
        "--csv-file",
        type=Path,
        required=True,
        help="CSV file to process",
    )

    submit_parser.add_argument(
        "--workspace-id",
        required=True,
        help="Workspace ARI (e.g., ari:cloud:beacon::workspace/...)",
    )
    submit_parser.add_argument(
        "--dry-run",
        action="store_true",
        help="Check payload size without submitting",
    )
    submit_parser.add_argument(
        "-v", "--verbose",
        action="store_true",
        help="Print the items being sent in the payload (pretty printed)",
    )

    # Status command
    status_parser = subparsers.add_parser(
        "status",
        parents=[common_parser],
        help="Check status of a redaction job",
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    status_parser.add_argument(
        "--workspace-id",
        required=True,
        help="Workspace ARI",
    )
    status_parser.add_argument(
        "--job-id",
        required=True,
        help="Job ID to check (from submit command output)",
    )

    args = parser.parse_args()

    if args.command is None:
        parser.print_help()
        sys.exit(1)

    if args.command == "submit":
        run_submit(args)
    elif args.command == "status":
        run_status(args)


if __name__ == "__main__":
    main()

Rate this page: