Sync¶
CLI¶
- class cartography.cli.CLI(sync: Sync | None = None, prog: str | None = None)¶
Bases:
objectCommand Line Interface for cartography.
This class provides the main command line interface for cartography, handling argument parsing, configuration, and execution of sync operations.
- sync¶
A cartography.sync.Sync object for executing sync operations.
- prog¶
The name of the command line program for display in help output.
- parser¶
The argparse.ArgumentParser instance for parsing command line arguments.
Note
If no sync object is provided, the CLI will create a default sync using cartography.sync.build_default_sync(). The parser is built automatically during initialization.
- main(argv: list[str]) int¶
Main entrypoint for the command line interface.
This method parses command line arguments, configures logging and various service connections, validates input parameters, and executes the cartography sync operation with the provided configuration.
- Parameters:
argv – The command line arguments to parse. Should be a list of strings representing the command line parameters (excluding the program name).
- Returns:
An integer exit code. Returns 0 for successful execution, or a non-zero value for errors or keyboard interruption.
- cartography.cli.main(argv=None)¶
Default entrypoint for the cartography command line interface.
This function sets up basic logging configuration and creates a CLI instance with the default cartography sync configuration. It serves as the main entry point when cartography is executed as a command line tool.
- Parameters:
argv – Optional command line arguments. If None, uses sys.argv[1:]. Should be a list of strings representing command line parameters.
- Returns:
Does not return - calls sys.exit() with the appropriate exit code. Exit code 0 indicates successful execution, non-zero indicates errors.
Note
This function configures logging levels for various third-party libraries to reduce noise in the output. It creates a CLI instance using the default sync configuration from cartography.sync.build_default_sync().
The function handles KeyboardInterrupt exceptions gracefully through the CLI.main() method.
Sync¶
- class cartography.sync.Sync¶
Bases:
objectA cartography synchronization task orchestrator.
The Sync class is responsible for ensuring the data in the graph database accurately represents the current state of reality. It accomplishes this by executing a sequence of sync “modules” in a specific order. Each module is responsible for:
Retrieving data from various sources (APIs, files, etc.)
Pushing that data to Neo4j graph database
Removing stale nodes and relationships from the graph
The Sync instance can be configured to run any combination of available modules in a user-defined order, providing flexibility for different deployment scenarios.
- _stages¶
An OrderedDict containing module names mapped to their callable functions.
Examples
Creating a custom sync with specific stages: >>> sync = Sync() >>> sync.add_stage(‘aws’, cartography.intel.aws.start_aws_ingestion) >>> sync.add_stage(‘analysis’, cartography.intel.analysis.run)
Running multiple stages: >>> stages = [ … (‘create-indexes’, cartography.intel.create_indexes.run), … (‘aws’, cartography.intel.aws.start_aws_ingestion), … (‘analysis’, cartography.intel.analysis.run) … ] >>> sync.add_stages(stages) >>> exit_code = sync.run(neo4j_driver, config)
Note
Stages are executed in the order they are added. The ‘create-indexes’ stage should typically be run first, and ‘analysis’ should be run last. Meta-stages for pre-sync, sync, and post-sync hooks may be added in future versions.
- __init__()¶
- add_stage(name: str, func: Callable) None¶
Add a single stage to the sync task.
This method registers a new stage with the sync task. Stages are executed in the order they are added, so the order of add_stage() calls determines the execution sequence.
- Parameters:
name – The unique name identifier for the stage. This name is used for logging and identification purposes.
func – The callable function to execute when this stage runs. The function should accept (neo4j_session, config) parameters.
Note
The stage name should be unique within the sync task. If a stage with the same name already exists, it will be replaced. Stage functions must follow the standard signature (neo4j_session, config).
- add_stages(stages: List[Tuple[str, Callable]]) None¶
Add multiple stages to the sync task in batch.
This method is a convenience function for adding multiple stages at once. It iterates through the provided list and calls add_stage() for each stage tuple.
- Parameters:
stages – A list of tuples where each tuple contains (stage_name, stage_function). The stage_name should be a unique string identifier, and stage_function should be a callable that accepts (neo4j_session, config) parameters.
Note
Stages are added in the order they appear in the list, which determines their execution order. This method is equivalent to calling add_stage() for each tuple individually.
- classmethod list_intel_modules() OrderedDict¶
Discover and list all available modules.
This class method dynamically discovers all modules in the cartography.intel package and returns a dictionary mapping module names to their callable ingestion functions. It automatically handles module loading and function discovery using naming conventions.
- Returns:
An OrderedDict where keys are module names and values are callable functions that follow the start_{module}_ingestion pattern. The ‘create-indexes’ module is always included first, and ‘analysis’ is always included last.
Examples
Getting all available modules: >>> modules = Sync.list_intel_modules() >>> print(list(modules.keys())) [‘create-indexes’, ‘aws’, ‘gcp’, ‘github’, …, ‘analysis’]
Creating sync with discovered modules: >>> modules = Sync.list_intel_modules() >>> sync = Sync() >>> for name, func in modules.items(): … sync.add_stage(name, func)
Note
The method uses reflection to discover modules and their ingestion functions. It expects functions to follow the naming pattern start_{module_name}_ingestion. Modules that fail to import are logged as errors but don’t prevent discovery of other modules.
The ‘create-indexes’ and ‘analysis’ modules are handled specially to ensure consistent ordering regardless of discovery order.
- run(neo4j_driver: Driver, config: Config | Namespace) int¶
Execute all configured stages in the sync task sequentially.
This method is the main execution entry point for the sync task. It creates a Neo4j session and executes each configured stage in order, providing comprehensive logging and error handling.
- Parameters:
neo4j_driver – A Neo4j driver instance for database connectivity.
config – Configuration object containing sync parameters including update_tag, neo4j_database, and stage-specific settings.
- Returns:
STATUS_SUCCESS (0) if all stages complete successfully.
- Raises:
KeyboardInterrupt – If user interrupts execution during a stage.
SystemExit – If system exit is triggered during a stage.
Exception – Any unhandled exception from module execution.
Examples
>>> sync = build_default_sync() >>> driver = GraphDatabase.driver("bolt://localhost:7687") >>> exit_code = sync.run(driver, config) >>> if exit_code == STATUS_SUCCESS: ... print("Sync completed successfully")
Note
Each stage is executed within the same Neo4j session to maintain transaction context. Stages are responsible for their own error handling, but unhandled exceptions will terminate the sync process.
The method logs the start and completion of each module for monitoring and debugging purposes.
- cartography.sync.build_default_sync() Sync¶
Build the default cartography sync with all available intelligence modules.
This function creates a Sync instance configured with all intelligence modules shipped with cartography. The modules are added in a specific order to ensure proper execution sequence, with ‘create-indexes’ first and ‘analysis’ last.
- Returns:
A fully configured Sync instance with all available intelligence modules.
Examples
Creating and running default sync: >>> sync = build_default_sync() >>> exit_code = run_with_config(sync, config)
Inspecting default stages: >>> sync = build_default_sync() >>> stage_names = list(sync._stages.keys()) >>> print(f”Default sync includes {len(stage_names)} stages”)
Note
The default sync includes all modules defined in TOP_LEVEL_MODULES, which encompasses cloud providers (AWS, GCP, Azure), security tools (CrowdStrike, Okta), development platforms (GitHub), and analysis capabilities. This provides comprehensive infrastructure mapping out of the box.
For custom sync configurations with specific modules, use build_sync() with a selected modules string instead.
- cartography.sync.build_sync(selected_modules_as_str: str) Sync¶
Build a custom sync with user-specified modules.
This function creates a Sync instance configured only with the modules specified in the comma-separated string. It provides a way to run a subset of available intelligence modules rather than the full default set.
- Parameters:
selected_modules_as_str – A comma-separated string of module names to include in the sync (e.g., “aws,gcp,analysis”).
- Returns:
A Sync instance configured with only the specified modules in the order they appear in the input string.
- Raises:
ValueError – If any specified module is invalid (propagated from parse_and_validate_selected_modules).
Examples
Building sync with specific cloud providers: >>> sync = build_sync(“aws,gcp,azure”) >>> # Only AWS, GCP, and Azure modules will run
Building minimal sync with just analysis: >>> sync = build_sync(“create-indexes,analysis”) >>> # Only index creation and analysis will run
Building security-focused sync: >>> sync = build_sync(“create-indexes,okta,crowdstrike,analysis”) >>> # Focus on identity and security platforms
Note
The order of modules in the input string determines their execution order. It’s recommended to include ‘create-indexes’ first and ‘analysis’ last for optimal results. The function validates all module names before creating the sync instance.
- cartography.sync.parse_and_validate_selected_modules(selected_modules: str) List[str]¶
Parse and validate user-selected modules from comma-separated string.
This function takes a comma-separated string of module names provided by the user and validates that each module exists in the available modules. It returns a clean list of validated module names.
- Parameters:
selected_modules – A comma-separated string of module names (e.g., “aws,gcp,analysis”). Module names will be stripped of whitespace.
- Returns:
A list of validated module names that exist in TOP_LEVEL_MODULES.
- Raises:
ValueError – If any specified module is not found in the available modules. The error message includes the invalid input and lists all valid options.
Note
Module names are case-sensitive and must exactly match those defined in TOP_LEVEL_MODULES. The function is tolerant of whitespace around commas but requires exact name matches for validation.
- cartography.sync.run_with_config(sync: Sync, config: Config | Namespace) int¶
Execute a sync task with comprehensive configuration and error handling.
This function serves as a high-level wrapper around Sync.run() that handles Neo4j driver creation, authentication, StatsD configuration, and provides comprehensive error handling for common connection and authentication issues.
- Parameters:
sync – A configured Sync instance with stages to execute.
config – Configuration object containing Neo4j connection settings, authentication credentials, StatsD settings, and other parameters.
- Returns:
STATUS_SUCCESS (0) if sync completes successfully. STATUS_FAILURE (1) if Neo4j connection or authentication fails. Other exit codes may be returned by the sync.run() method.
Examples
Running default sync with configuration: >>> sync = build_default_sync() >>> config.neo4j_uri = “bolt://localhost:7687” >>> config.neo4j_user = “neo4j” >>> config.neo4j_password = “password” >>> exit_code = run_with_config(sync, config)
Running with StatsD enabled: >>> config.statsd_enabled = True >>> config.statsd_host = “localhost” >>> config.statsd_port = 8125 >>> exit_code = run_with_config(sync, config)
Note
The function automatically generates an update_tag based on current timestamp if one is not provided in the configuration. It handles Neo4j driver creation, authentication setup, and provides detailed error messages for connection and authentication failures.
If StatsD is enabled in the configuration, it initializes the global StatsD client for metrics collection during the sync process.
Utils¶
- cartography.util.aws_handle_regions(func: AWSGetFunc) AWSGetFunc¶
Decorator to handle AWS regional access errors and opt-in region limitations.
This decorator wraps AWS API functions to gracefully handle client errors that occur when accessing regions that are disabled, require opt-in, or where the account lacks necessary permissions. Instead of failing, the decorated function returns an empty list when these specific errors occur.
The decorator also includes exponential backoff retry logic to handle AWS TooManyRequestsException and other transient errors that may occur during API calls.
- Parameters:
func – An AWS API function that returns an iterable (typically a list) of resources. Should be a ‘get_’ function that queries AWS services.
- Returns:
The decorated function with error handling and retry logic applied. On handled errors, returns an empty list instead of raising exceptions.
Examples
Decorating an AWS resource getter function: >>> @aws_handle_regions … def get_ec2_instances(boto3_session, region): … ec2 = boto3_session.client(‘ec2’, region_name=region) … return ec2.describe_instances()[‘Reservations’]
Note
The decorator handles these specific AWS error codes: - AccessDenied / AccessDeniedException - AuthFailure - AuthorizationError / AuthorizationErrorException - InvalidClientTokenId - UnauthorizedOperation - UnrecognizedClientException - InternalServerErrorException
For these errors, a warning is logged and an empty list is returned. Other errors are re-raised normally.
The decorator includes retry logic with exponential backoff (max 600 seconds) for handling transient AWS API errors and rate limiting.
This should be used on functions that return lists of AWS resources and need to work across multiple regions, including those that may be disabled or require special permissions.
- cartography.util.aws_paginate(client: client, method_name: str, object_name: str, max_pages: int | None = 10000, **kwargs: Any) Iterable[Dict]¶
Helper function for handling AWS boto3 API pagination with progress logging.
This function provides a convenient wrapper around boto3’s pagination functionality, with built-in progress logging and configurable page limits to prevent runaway API calls.
- Parameters:
client – The boto3 client instance to use for API calls.
method_name – The name of the boto3 client method to paginate.
object_name – The key in the API response containing the list of items.
max_pages – Maximum number of pages to fetch. None for unlimited. Defaults to DEFAULT_MAX_PAGES.
**kwargs – Additional keyword arguments to pass to the paginator.
- Yields:
Individual items from the paginated API response.
Examples
Paginating EC2 instances: >>> ec2_client = boto3.client(‘ec2’) >>> for instance in aws_paginate( … ec2_client, … ‘describe_instances’, … ‘Reservations’ … ): … print(instance)
Paginating with filters and limits: >>> for bucket in aws_paginate( … s3_client, … ‘list_objects_v2’, … ‘Contents’, … max_pages=100, … Bucket=’my-bucket’, … Prefix=’logs/’ … ): … print(bucket)
Note
The function logs progress every 100 pages to help monitor long-running operations. If the specified object_name is not found in a response page, a warning is logged but iteration continues.
The max_pages limit is enforced to prevent excessive API calls that could hit rate limits or take excessive time. A warning is logged when the limit is reached.
- cartography.util.backoff_handler(details: Dict) None¶
Log backoff retry attempts for monitoring and debugging.
This handler function is called by the backoff decorator when retries are being performed. It provides visibility into retry patterns and helps with debugging API rate limiting or connectivity issues.
- Parameters:
details – Dictionary containing backoff information including: - wait: Number of seconds to wait before retry - tries: Number of attempts made so far - target: The function being retried
Examples
The function is typically used automatically by backoff decorators: >>> @backoff.on_exception( … backoff.expo, … Exception, … on_backoff=backoff_handler … ) … def api_call(): … # Make API call that might fail … pass
Note
This function logs at WARNING level to ensure visibility of retry operations in standard logging configurations. The message includes timing information and function identification for debugging. The backoff library may provide partial details (e.g.
waitcan beNonewhen a retry is triggered immediately). Format the message defensively so logging never raises.
- cartography.util.batch(items: Iterable, size: int = 1000) Iterable[List[Any]]¶
Split an iterable into batches of specified size.
This utility function takes any iterable and yields lists of items in batches of the specified size. This is useful for processing large datasets in manageable chunks, especially when working with APIs that have limits on batch operations.
- Parameters:
items – The iterable to split into batches.
size – The maximum size of each batch. Defaults to DEFAULT_BATCH_SIZE.
- Yields:
Lists containing up to ‘size’ items from the original iterable. The last batch may contain fewer items if the total count is not evenly divisible by the batch size.
Examples
Basic batching: >>> list(batch([1, 2, 3, 4, 5, 6, 7, 8], size=3)) [[1, 2, 3], [4, 5, 6], [7, 8]]
Note
The function uses itertools.islice for memory-efficient processing of large iterables. It doesn’t load the entire iterable into memory at once, making it suitable for processing very large datasets.
The DEFAULT_BATCH_SIZE is optimized for typical Neo4j operations but can be adjusted based on specific use cases and constraints.
- cartography.util.camel_to_snake(name: str) str¶
Convert CamelCase strings to snake_case format.
This utility function converts CamelCase identifiers (commonly used in APIs) to snake_case format (commonly used in Python). It’s useful for normalizing field names when processing API responses.
- Parameters:
name – The CamelCase string to convert.
- Returns:
The converted snake_case string.
- cartography.util.dict_date_to_epoch(obj: Dict, key: str) int | None¶
Convert a dictionary date value to Unix epoch timestamp.
Deprecated since version This: method is deprecated. Neo4j can handle datetime ingestion directly, and the datetime format should be preferred over epoch timestamps for better readability and native time operations support.
This utility function retrieves a datetime object from a dictionary and converts it to a Unix epoch timestamp (seconds since 1970-01-01). This is useful for standardizing date representations in Neo4j.
- Parameters:
obj – The dictionary containing the date value.
key – The key to look up in the dictionary.
- Returns:
Unix epoch timestamp as integer if key exists and contains a datetime, None otherwise.
Examples
Converting datetime objects (deprecated approach): >>> from datetime import datetime >>> data = { … “created”: datetime(2023, 1, 15, 10, 30, 0), … “modified”: datetime(2023, 2, 20, 14, 45, 30) … } >>> dict_date_to_epoch(data, “created”) 1673779800 >>> dict_date_to_epoch(data, “modified”) 1676902530
Note
The function expects the dictionary value to be a datetime object with a timestamp() method. This is commonly used when processing AWS API responses that return datetime objects for timestamps.
Neo4j natively supports datetime objects and provides rich temporal functions for queries. Using datetime objects directly is preferred over epoch timestamps for better readability, timezone support, and access to Neo4j’s temporal functions like date(), time(), and duration().
For new code, consider storing datetime objects directly in Neo4j rather than converting them to epoch timestamps.
- cartography.util.dict_value_to_str(obj: Dict, key: str) str | None¶
Safely convert a dictionary value to string representation.
This utility function retrieves a value from a dictionary and converts it to a string if it exists, or returns None if the key doesn’t exist. This is useful for handling API responses where fields may be missing.
- Parameters:
obj – The dictionary to search in.
key – The key to look up in the dictionary.
- Returns:
String representation of the value if key exists, None otherwise.
- cartography.util.is_service_control_policy_explicit_deny(error: ClientError) bool¶
Return True if the ClientError was caused by an explicit service control policy deny.
- cartography.util.is_throttling_exception(exc: Exception) bool¶
Determine if an exception is caused by API rate limiting or throttling.
This function checks whether a given exception indicates that an API call was throttled or rate-limited by the service provider. It currently supports AWS boto3 throttling exceptions and can be extended to support other cloud providers’ throttling mechanisms.
- Parameters:
exc – The exception to check for throttling indicators.
- Returns:
True if the exception indicates throttling/rate limiting, False otherwise.
Examples
Checking AWS boto3 exceptions: >>> import botocore.exceptions >>> try: … # AWS API call that might be throttled … s3_client.list_buckets() … except Exception as e: … if is_throttling_exception(e): … print(“Request was throttled, should retry”) … else: … print(“Different type of error occurred”)
Integration with backoff decorators: >>> @backoff.on_exception( … backoff.expo, … lambda e: is_throttling_exception(e), … max_tries=3 … ) … def resilient_api_call(): … return api_client.get_data()
Note
Currently supports these AWS error codes: - LimitExceededException: General rate limit exceeded - Throttling: Request rate too high
The function can be extended to support other cloud providers like GCP (google.api_core.exceptions.TooManyRequests) or Azure as needed.
This function is particularly useful in conjunction with retry decorators or custom retry logic to distinguish between transient throttling errors that should be retried and permanent errors that should not.
See AWS documentation for more details on error handling: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
- cartography.util.load_resource_binary(package: str, resource_name: str) BinaryIO¶
Load a binary resource from a Python package.
This function provides a convenient way to load binary files (like images, compiled data, etc.) that are packaged with cartography modules.
- Parameters:
package – The Python package name containing the resource.
resource_name – The filename of the binary resource to load.
- Returns:
A binary file-like object that can be read from.
Examples
Loading indexes for Neo4j: >>> binary_data = load_resource_binary( … “cartography.data”, … “indexes.cypher” … ) >>> content = binary_data.read()
Note
This function uses importlib.resources.open_binary() under the hood, which works with both traditional file-system packages and newer importlib-based resource systems. The returned file object should be properly closed after use.
- cartography.util.make_neo4j_datetime_validator() Callable[[Any], datetime | None]¶
Create a Pydantic BeforeValidator for neo4j.time.DateTime conversion.
- Usage with Pydantic v2:
from typing import Annotated from pydantic import BeforeValidator from cartography.util import to_datetime
Neo4jDateTime = Annotated[datetime, BeforeValidator(to_datetime)]
- class MyModel(BaseModel):
created_at: Neo4jDateTime
Returns a lambda that can be used with BeforeValidator.
- cartography.util.merge_module_sync_metadata(neo4j_session: Session, group_type: str, group_id: str | int, synced_type: str, update_tag: int, stat_handler: ScopedStatsClient) None¶
Create or update ModuleSyncMetadata nodes to track sync operations.
This function creates ModuleSyncMetadata nodes that record when specific resource types were synchronized within a particular scope. This metadata is used for tracking sync completeness and data freshness.
- Parameters:
neo4j_session – Active Neo4j session for executing the metadata update.
group_type – The parent module’s node label (e.g., ‘AWSAccount’).
group_id – The unique identifier of the parent module instance.
synced_type – The sub-module’s node label that was synced (e.g., ‘S3Bucket’).
update_tag – Timestamp used to determine data freshness.
stat_handler – StatsD client for sending metrics about the sync operation.
Examples
Recording S3 bucket sync for an AWS account: >>> merge_module_sync_metadata( … neo4j_session, … group_type=”AWSAccount”, … group_id=”123456789012”, … synced_type=”S3Bucket”, … update_tag=1234567890, … stat_handler=stats_client … )
Note
The function creates a unique ModuleSyncMetadata node with an ID constructed from the group_type, group_id, and synced_type. This ensures one metadata record per sync scope. The function also sends a StatsD metric with the update timestamp for monitoring.
The ‘types’ used should be actual Neo4j node labels present in the graph schema.
- cartography.util.retries_with_backoff(func: Callable, exception_type: Type[Exception], max_tries: int, on_backoff: Callable) Callable¶
Add exponential backoff retry logic to any function.
This decorator function wraps any callable with retry logic that uses exponential backoff. When the specified exception type is raised, the function will be retried with increasing delays between attempts until the maximum number of tries is reached.
- Parameters:
func – The function to wrap with retry logic. Can be any callable.
exception_type – The specific exception class that should trigger retries. Only this exception type (and its subclasses) will be retried.
max_tries – Maximum number of attempts before giving up. Includes the initial attempt, so max_tries=3 means 1 initial + 2 retries.
on_backoff – Callback function called before each retry attempt. Should accept a dictionary with backoff details (wait, tries, target).
- Returns:
The decorated function with retry logic applied. Preserves the original function’s signature and return type.
Examples
>>> import boto3 >>> def get_s3_objects(): ... return s3_client.list_objects_v2(Bucket='my-bucket') >>> >>> resilient_s3_call = retries_with_backoff( ... get_s3_objects, ... botocore.exceptions.ClientError, ... max_tries=4, ... on_backoff=backoff_handler ... )
Note
The function uses exponential backoff with jitter by default, meaning retry delays increase exponentially: ~1s, ~2s, ~4s, ~8s, etc. The exact timing may vary due to jitter to avoid thundering herd problems.
Only the specified exception_type will trigger retries. Other exceptions will be raised immediately without retry attempts.
The on_backoff callback receives a dictionary with keys: - ‘wait’: seconds to wait before next retry - ‘tries’: number of attempts made so far - ‘target’: the function being retried
This is a general-purpose retry utility that can be applied to any function, not just AWS or API calls.
- cartography.util.run_analysis_and_ensure_deps(analysis_job_name: str, resource_dependencies: Set[str], requested_syncs: Set[str], common_job_parameters: Dict[str, Any], neo4j_session: Session) None¶
Conditionally run an analysis job based on resource dependency requirements.
This function checks if all required resource dependencies have been included in the requested syncs before executing the analysis job. This ensures that analysis jobs only run when their prerequisite data is available in the graph.
- Parameters:
analysis_job_name – The filename of the analysis job to run (e.g., “aws_foreign_accounts.json”).
resource_dependencies – Set of resource sync names that must be completed for this analysis job to run. Use empty set if no dependencies.
requested_syncs – Set of resource sync names that were requested in the current cartography execution.
common_job_parameters – Dictionary containing common job parameters used across cartography jobs.
neo4j_session – Active Neo4j session for executing the analysis queries.
Examples
Running analysis with AWS dependencies: >>> run_analysis_and_ensure_deps( … “aws_foreign_accounts.json”, … {“aws:ec2”, “aws:iam”}, … {“aws:ec2”, “aws:iam”, “aws:s3”}, … common_params, … neo4j_session … ) # Will run because all dependencies are satisfied
Skipping analysis due to missing dependencies: >>> run_analysis_and_ensure_deps( … “gcp_analysis.json”, … {“gcp:compute”, “gcp:iam”}, … {“aws:ec2”}, # Missing GCP dependencies … common_params, … neo4j_session … ) # Will skip and log warning
Running analysis with no dependencies: >>> run_analysis_and_ensure_deps( … “general_analysis.json”, … set(), # No dependencies … {“aws:ec2”}, … common_params, … neo4j_session … ) # Will always run
Note
If dependencies are not satisfied, the function logs an informational message and returns without executing the analysis job. This prevents analysis jobs from running on incomplete data which could produce misleading results.
- cartography.util.run_analysis_job(filename: str, neo4j_session: Session, common_job_parameters: Dict, package: str = 'cartography.data.jobs.analysis') None¶
Execute an analysis job to enrich existing graph data.
This function is designed for use with the cartography.intel.analysis sync stage. It runs queries from the specified Python package directory to perform analysis operations on the complete graph data. Analysis jobs are intended to run at the end of a full graph sync and apply to all resources across all accounts/projects.
- Parameters:
filename – Name of the JSON file containing the analysis job queries.
neo4j_session – Active Neo4j session for executing the analysis queries.
common_job_parameters – Dictionary containing common parameters used across all cartography jobs (e.g., update_tag).
package – Python package containing the analysis job files. Defaults to “cartography.data.jobs.analysis”.
Examples
Running a standard analysis job: >>> run_analysis_job( … “aws_foreign_accounts.json”, … neo4j_session, … {“UPDATE_TAG”: 1234567890} … )
Running analysis from custom package: >>> run_analysis_job( … “custom_analysis.json”, … neo4j_session, … common_params, … package=”my_company.analysis_jobs” … )
Note
Analysis jobs are unscoped and apply to ALL resources in the graph (all AWS accounts, all GCP projects, all Okta organizations, etc.). For scoped analysis, use run_scoped_analysis_job() instead.
The job file must be a valid JSON file containing GraphJob-compatible query definitions.
- cartography.util.run_cleanup_job(filename: str, neo4j_session: Session, common_job_parameters: Dict, package: str = 'cartography.data.jobs.cleanup') None¶
Execute a cleanup job to remove stale data from the graph.
Deprecated since version This: function is deprecated. For resources that have migrated to the new data model, use GraphJob directly instead of this wrapper function.
This function runs cleanup queries that identify and remove nodes and relationships that are no longer current based on update timestamps. Cleanup jobs are essential for maintaining data freshness and preventing the accumulation of stale data in the graph.
- Parameters:
filename – Name of the JSON file containing the cleanup job queries.
neo4j_session – Active Neo4j session for executing the cleanup queries.
common_job_parameters – Dictionary containing common parameters including the update_tag used to identify stale data.
package – Python package containing the cleanup job files. Defaults to “cartography.data.jobs.cleanup”.
Examples
Running standard cleanup job: >>> run_cleanup_job( … “aws_ec2_cleanup.json”, … neo4j_session, … {“UPDATE_TAG”: 1234567890} … )
Running cleanup from custom package: >>> run_cleanup_job( … “custom_cleanup.json”, … neo4j_session, … common_params, … package=”my_company.cleanup_jobs” … )
Note
Cleanup jobs typically use the UPDATE_TAG parameter to identify nodes and relationships that haven’t been updated in the current sync cycle. These stale items are then removed to maintain data accuracy. Cleanup jobs should be run after all data ingestion stages are complete.
For resources migrated to the new data model, prefer using GraphJob directly rather than this wrapper function to ensure compatibility with current cartography patterns and to avoid potential deprecation issues in future versions.
- cartography.util.run_scoped_analysis_job(filename: str, neo4j_session: Session, common_job_parameters: Dict, package: str = 'cartography.data.jobs.scoped_analysis') None¶
Execute a scoped analysis job on a specific sub-resource.
This function runs analysis queries that are scoped to a particular sub-resource (e.g., a specific AWS account) rather than across the entire graph. This is useful for analysis that should be performed within the context of a single organizational unit or account.
- Parameters:
filename – Name of the JSON file containing the scoped analysis job queries.
neo4j_session – Active Neo4j session for executing the analysis queries.
common_job_parameters – Dictionary containing common parameters including scope-specific identifiers (e.g., AWS account ID).
package – Python package containing the scoped analysis job files. Defaults to “cartography.data.jobs.scoped_analysis”.
Examples
Running scoped analysis for AWS account: >>> common_params = { … “UPDATE_TAG”: 1234567890, … “AWS_ID”: “123456789012” … } >>> run_scoped_analysis_job( … “aws_account_security.json”, … neo4j_session, … common_params … )
Running scoped analysis from custom package: >>> run_scoped_analysis_job( … “gcp_project_analysis.json”, … neo4j_session, … common_params, … package=”my_company.scoped_jobs” … )
Note
Scoped analysis jobs are limited to data within a specific scope (typically defined by parameters like AWS_ID, GCP_PROJECT_ID, etc.). This is in contrast to global analysis jobs that operate across all resources. See the queries in cartography.data.jobs.scoped_analysis for specific examples of scoped analysis patterns.
- cartography.util.timeit(method: F) F¶
Decorator to measure and report function execution time via StatsD.
This decorator automatically measures the execution time of the wrapped function and sends the timing data to a StatsD server if StatsD is enabled in the cartography configuration. The metric name is derived from the function’s module and name.
- Parameters:
method – The function to be timed and measured.
- Returns:
The decorated function with timing instrumentation.
Examples
Decorating a function for timing: >>> @timeit … def expensive_operation(): … # Complex processing here … return result
Note
The decorator only performs timing when StatsD is enabled in the cartography configuration. When disabled, it simply calls the original function without any overhead.
The timing metric is sent with the pattern: {module_name}.{function_name}
The decorator preserves the original function’s signature and metadata using functools.wraps, making it transparent to inspection tools and integration tests.
- cartography.util.to_asynchronous(func: Callable[[...], R], *args: Any, **kwargs: Any) Awaitable[R]¶
Execute a synchronous function asynchronously in a threadpool with throttling protection.
This function wraps any synchronous callable to run in the default asyncio threadpool, making it awaitable. It includes built-in protection against throttling errors through automatic retry with exponential backoff. This is a transitional helper until we migrate to Python 3.9’s asyncio.to_thread.
- Parameters:
func – The synchronous function to execute asynchronously.
*args – Positional arguments to pass to the function.
**kwargs – Keyword arguments to pass to the function.
- Returns:
An awaitable that resolves to the function’s return value when executed.
Examples
Converting a synchronous API call to async: >>> def fetch_data(endpoint, timeout=30): … return requests.get(endpoint, timeout=timeout).json() >>> >>> async def main(): … # Run synchronous function asynchronously … future = to_asynchronous(fetch_data, “https://api.example.com/data”, timeout=10) … data = await future … return data
Note
Once Python 3.9+ is adopted, consider migrating to asyncio.to_thread() for similar functionality with native asyncio support.
- cartography.util.to_datetime(value: Any) datetime | None¶
Convert a neo4j.time.DateTime object to a Python datetime object.
Neo4j returns datetime fields as neo4j.time.DateTime objects, which are not compatible with standard Python datetime or Pydantic datetime validation. This function converts neo4j.time.DateTime to Python datetime.
- Parameters:
value – A neo4j.time.DateTime object, Python datetime, or None
- Returns:
A Python datetime object or None
- Raises:
TypeError – If value is not a supported datetime type
- cartography.util.to_synchronous(*awaitables: Awaitable[Any]) List[Any]¶
Synchronously execute multiple awaitables and return their results.
This function blocks the current thread until all provided awaitables complete, collecting their results into a list. It’s designed for use in synchronous code that needs to execute async functions or consume results from async operations without converting the calling code to async.
- Parameters:
*awaitables – Variable number of awaitable objects (Futures, coroutines, tasks). Each awaitable is provided as a separate argument, not as a list.
- Returns:
List containing the results of all awaitables in the same order they were provided. If any awaitable raises an exception, the entire operation fails.
Examples
Executing multiple async functions synchronously: >>> async def fetch_user(user_id): … # Simulate async API call … await asyncio.sleep(0.1) … return f”User {user_id}” >>> >>> async def fetch_posts(user_id): … # Simulate another async API call … await asyncio.sleep(0.1) … return f”Posts for {user_id}” >>> >>> # Execute both async functions from sync code >>> user_future = fetch_user(123) >>> posts_future = fetch_posts(123) >>> results = to_synchronous(user_future, posts_future) >>> print(results) # [‘User 123’, ‘Posts for 123’]
Note
This function uses asyncio.gather() internally, which means: - All awaitables run concurrently - If any awaitable fails, the entire operation fails immediately - Results are returned in the same order as the input awaitables
This is particularly useful for: - Legacy synchronous code that needs to call async functions - Testing async code in synchronous test frameworks - CLI scripts that need to orchestrate async operations - Bridge code between sync and async boundaries
For error handling, consider using asyncio.gather(return_exceptions=True) if you need to handle individual failures gracefully. This function does not provide that option currently.
Be aware that this function blocks the calling thread until all awaitables complete. For web applications or other async contexts, prefer using await directly with asyncio.gather().
Stats¶
- class cartography.stats.ScopedStatsClient(prefix: str | None, root: ScopedStatsClient)¶
Bases:
objectA proxy wrapper for StatsD client with scoped metric prefixing capabilities.
This class provides a hierarchical scoping mechanism for StatsD metrics, allowing metric names to be automatically prefixed based on the scope. This enables organized metric namespacing and nested scoping for better metric organization.
- _client¶
The underlying StatsClient instance (class-level).
- Type:
statsd.client.udp.StatsClient
- _scope_prefix¶
The prefix string for this scope level.
- _root¶
Reference to the root ScopedStatsClient instance.
Examples
Basic scoped client usage: >>> root_client = ScopedStatsClient.get_root_client() >>> aws_client = root_client.get_stats_client(‘aws’) >>> aws_client.incr(‘ec2.instances’) # Metric: aws.ec2.instances
Nested scoping: >>> root_client = ScopedStatsClient.get_root_client() >>> aws_client = root_client.get_stats_client(‘aws’) >>> ec2_client = aws_client.get_stats_client(‘ec2’) >>> ec2_client.incr(‘instances’) # Metric: aws.ec2.instances
Timer usage: >>> client = ScopedStatsClient.get_root_client() >>> sync_client = client.get_stats_client(‘sync’) >>> with sync_client.timer(‘duration’): … # Timed operation … pass # Metric: sync.duration
Note
The class maintains a single shared StatsClient instance at the root level. All scoped instances are proxies that prefix metrics before forwarding to the root client. The client must be enabled via set_stats_client() before metrics will be sent.
- __init__(prefix: str | None, root: ScopedStatsClient)¶
- gauge(stat: str, value: int, rate: float = 1.0, delta: bool = False)¶
Report a StatsD gauge metric value.
This method reports a gauge value using the underlying StatsD client. Gauges represent a snapshot value that can go up or down. The metric name is automatically prefixed with the current scope prefix.
- Parameters:
stat – The name of the gauge metric to report.
value – The gauge value to report.
rate – The sample rate (0.0 to 1.0). Only sends data this percentage of the time. Defaults to 1.0 (always send).
delta – If True, the value is added to the current gauge value instead of replacing it. Defaults to False (set absolute value).
- Returns:
The result from the underlying StatsD client, or None if not enabled.
Examples
Setting absolute gauge value: >>> client = ScopedStatsClient.get_root_client() >>> aws_client = client.get_stats_client(‘aws’) >>> aws_client.gauge(‘active.connections’, 42) # Metric: aws.active.connections
Delta gauge update: >>> aws_client.gauge(‘queue.size’, 5, delta=True) # Add 5 to current value
Using sample rate: >>> aws_client.gauge(‘memory.usage’, 1024, rate=0.5) # Sample 50%
Note
If the client is not enabled, returns None. When delta=True, the value is added to the existing gauge value rather than replacing it.
- static get_root_client() ScopedStatsClient¶
Create and return the root ScopedStatsClient instance.
This static method creates a root client with no prefix that serves as the base for all scoped clients. The root client maintains the actual StatsClient instance and serves as the entry point for the scoped client hierarchy.
- Returns:
A root ScopedStatsClient instance with no prefix.
- get_stats_client(scope: str) ScopedStatsClient¶
Create a new scoped StatsD client with additional prefix.
This method returns a new ScopedStatsClient proxy that automatically prefixes all metric names with the provided scope, in addition to any existing prefix from the current client.
- Parameters:
scope – The scope prefix to add to metric names. Will be appended to any existing prefix with a dot separator.
- Returns:
A new ScopedStatsClient instance with the combined prefix.
- incr(stat: str, count: int = 1, rate: float = 1.0) None¶
Increment a StatsD counter metric.
This method increments a counter metric using the underlying StatsD client. The metric name is automatically prefixed with the current scope prefix.
- Parameters:
stat – The name of the counter metric to increment.
count – The amount to increment by. May be negative for decrementing. Defaults to 1.
rate – The sample rate (0.0 to 1.0). Only sends data this percentage of the time. The StatsD server accounts for the sample rate. Defaults to 1.0 (always send).
Examples
Basic counter increment: >>> client = ScopedStatsClient.get_root_client() >>> sync_client = client.get_stats_client(‘sync’) >>> sync_client.incr(‘aws.accounts’) # Metric: sync.aws.accounts
Increment by custom amount: >>> client.incr(‘processed.items’, count=5)
Using sample rate: >>> client.incr(‘high.volume.metric’, rate=0.1) # Sample 10%
- is_enabled() bool¶
Check if the StatsD client is enabled and configured.
This method determines whether the underlying StatsD client has been properly configured and is ready to send metrics. It checks if the root client has a valid StatsClient instance set.
- Returns:
True if the StatsD client is configured and ready to send metrics, False if no client has been set or the client is None.
- set_stats_client(stats_client: StatsClient) None¶
Set the underlying StatsD client for this scoped client hierarchy.
This method configures the actual StatsClient instance that will be used by this scoped client and all other scoped clients in the hierarchy. The client is set at the root level, making it available to all scoped instances derived from this hierarchy.
- timer(stat: str, rate: float = 1.0)¶
Create a StatsD timer context manager for measuring execution time.
This method returns a timer object that can be used as a context manager to measure and report execution time. The metric name is automatically prefixed with the current scope prefix.
- Parameters:
stat – The name of the timer metric to report.
rate – The sample rate (0.0 to 1.0). Only sends data this percentage of the time. The StatsD server accounts for the sample rate. Defaults to 1.0 (always send).
- Returns:
A timer context manager object, or None if the client is not enabled.
Examples
Using as context manager: >>> client = ScopedStatsClient.get_root_client() >>> sync_client = client.get_stats_client(‘sync’) >>> with sync_client.timer(‘aws.duration’): … # Timed operation here … time.sleep(1) # Metric: sync.aws.duration
Manual timer usage: >>> timer = client.timer(‘operation.time’) >>> if timer: … timer.start() … # Do work … timer.stop() # Sends timing metric
- cartography.stats.get_stats_client(prefix: str) ScopedStatsClient¶
Get a scoped StatsD client with the specified prefix.
This function returns a ScopedStatsClient instance that automatically prefixes all metrics with the provided prefix string. It uses the global _scoped_stats_client as the root client.
- cartography.stats.set_stats_client(stats_client: StatsClient) None¶
Configure the global StatsD client for cartography metrics.
This function sets the underlying StatsClient instance that will be used by all ScopedStatsClient instances for sending metrics to the StatsD server. This should be called once during application initialization when StatsD is enabled.
- Parameters:
stats_client – A configured StatsClient instance that will handle the actual communication with the StatsD server.
Note
This function modifies the global _scoped_stats_client instance. Once set, all subsequent metric operations through ScopedStatsClient instances will use this client to send metrics to StatsD.