ORM

Client TX

cartography.client.core.tx.ensure_indexes(neo4j_session: Session, node_schema: CartographyNodeSchema) None

Create indexes for efficient node and relationship matching.

This function creates indexes for the given CartographyNodeSchema object, including indexes for all relationships defined in its other_relationships and sub_resource_relationship fields. This operation is idempotent and safe to run multiple times.

Parameters:
  • neo4j_session (neo4j.Session) – The Neo4j session for database operations.

  • node_schema (CartographyNodeSchema) – The node schema object to create indexes for. This defines which properties need indexing.

Raises:

ValueError – If any generated query doesn’t start with “CREATE INDEX IF NOT EXISTS”, indicating a potential security issue with the query generation.

Examples

>>> node_schema = CartographyNodeSchema(
...     label='AWSUser',
...     properties={'id': PropertyRef('UserId'), 'name': PropertyRef('UserName')},
...     sub_resource_relationship=make_target_node_matcher({'account_id': PropertyRef('AccountId')})
... )
>>> ensure_indexes(session, node_schema)

Note

  • This ensures that every MATCH operation on nodes will be indexed, making relationship creation and queries fast.

  • The id and lastupdated properties automatically have indexes created.

  • All properties included in target node matchers automatically have indexes created.

  • This function should be called before performing any data loading operations.

Create indexes for efficient relationship matching between existing nodes.

This function creates indexes for node fields referenced in the given CartographyRelSchema object. It’s specifically designed for matchlink operations where relationships are created between existing nodes.

Parameters:
  • neo4j_session (neo4j.Session) – The Neo4j session for database operations.

  • rel_schema (CartographyRelSchema) – The relationship schema object to create indexes for. This defines which node properties need indexing for efficient relationship matching.

Raises:

ValueError – If any generated query doesn’t start with “CREATE INDEX IF NOT EXISTS”, indicating a potential security issue with the query generation.

Note

  • This function is only used for load_matchlinks() operations where we match and connect existing nodes.

  • It’s not used for CartographyNodeSchema objects - use ensure_indexes() for those instead.

cartography.client.core.tx.execute_write_with_retry(neo4j_session: Session, tx_func: Any, *args: Any, **kwargs: Any) Any

Execute a custom transaction function with retry logic for transient errors.

This is a generic wrapper for any custom transaction function that needs retry logic for EntityNotFound and other transient errors. Use this when you have complex transaction logic that doesn’t fit the standard load_graph_data pattern.

Example usage:
def my_custom_tx(tx, data_list, update_tag):
for item in data_list:

tx.run(query, **item).consume()

execute_write_with_retry(

neo4j_session, my_custom_tx, data_list, update_tag

)

Parameters:
  • neo4j_session – The Neo4j session

  • tx_func – The transaction function to execute (takes neo4j.Transaction as first arg)

  • args – Positional arguments to pass to tx_func

  • kwargs – Keyword arguments to pass to tx_func

Returns:

The return value of tx_func

cartography.client.core.tx.load(neo4j_session: Session, node_schema: CartographyNodeSchema, dict_list: List[Dict[str, Any]], batch_size: int = 10000, **kwargs) None

Load node data to the graph with automatic indexing.

This is the main entry point for intel modules to write node data to the graph. It automatically ensures that required indexes exist before performing the load operation, optimizing performance and maintaining data integrity.

Parameters:
  • neo4j_session (neo4j.Session) – The Neo4j session for database operations.

  • node_schema (CartographyNodeSchema) – The node schema object that defines the structure of the data being loaded and generates the ingestion query.

  • dict_list (List[Dict[str, Any]]) – The data to load to the graph, represented as a list of dictionaries. Each dictionary represents one node to create or update.

  • batch_size (int) – The number of items to process per transaction. Defaults to 10000.

  • **kwargs – Additional keyword arguments passed to the Neo4j query, such as timestamps, update tags, or other metadata.

Examples

>>> node_schema = CartographyNodeSchema(
...     label='AWSUser',
...     properties={
...         'id': PropertyRef('UserId'),
...         'name': PropertyRef('UserName'),
...         'email': PropertyRef('Email')
...     }
... )
>>> users_data = [
...     {'UserId': 'user1', 'UserName': 'Alice', 'Email': 'alice@example.com'},
...     {'UserId': 'user2', 'UserName': 'Bob', 'Email': 'bob@example.com'}
... ]
>>> load(session, node_schema, users_data, lastupdated=current_time)

Note

  • If dict_list is empty, the function returns early to save processing time.

  • The function automatically creates necessary indexes before loading data.

  • The ingestion query is generated automatically from the node schema.

  • Data is processed in batches for optimal performance.

cartography.client.core.tx.load_graph_data(neo4j_session: Session, query: str, dict_list: List[Dict[str, Any]], batch_size: int = 10000, **kwargs) None

Load data to the graph using batched operations.

This function processes large datasets by splitting them into manageable batches and executing write transactions for each batch. This approach prevents memory issues and improves performance for large data loads.

This function handles retries for: - Network errors (ConnectionResetError) - Service unavailability (ServiceUnavailable, SessionExpired) - Transient database errors (TransientError) - EntityNotFound errors during concurrent operations (ClientError with specific code) - BufferError with “cannot be re-sized” during concurrent multi-threaded operations

EntityNotFound errors are retried because they commonly occur during concurrent write operations when multiple threads access the same node space. This is expected behavior in Neo4j’s query execution pipeline, not a permanent failure.

Parameters:
  • neo4j_session (neo4j.Session) – The Neo4j session for database operations.

  • query (str) – The Neo4j write query to execute. This query should be generated using cartography.graph.querybuilder.build_ingestion_query() rather than being handwritten to ensure proper formatting and security.

  • dict_list (List[Dict[str, Any]]) – The data to load to the graph, represented as a list of dictionaries. Each dictionary represents one record to process.

  • batch_size (int) – The number of items to process per transaction. Defaults to 10000.

  • **kwargs – Additional keyword arguments passed to the Neo4j query.

Examples

>>> # Generated query from querybuilder
>>> query = build_ingestion_query(node_schema)
>>> data = [
...     {'id': 'user1', 'name': 'Alice', 'email': 'alice@example.com'},
...     {'id': 'user2', 'name': 'Bob', 'email': 'bob@example.com'}
... ]
>>> load_graph_data(session, query, data, lastupdated=current_time)

Note

  • Data is processed in batches of 10,000 records to optimize memory usage and transaction performance.

  • This function is typically called by higher-level functions like load() rather than directly by user code.

Create relationships between existing nodes in the graph.

This is the main entry point for intel modules to write relationships between two existing nodes. It ensures proper indexing and executes the relationship creation query.

Parameters:
  • neo4j_session (neo4j.Session) – The Neo4j session for database operations.

  • rel_schema (CartographyRelSchema) – The relationship schema object used to generate the query and define the relationship structure.

  • dict_list (list[dict[str, Any]]) – The data for creating relationships, represented as a list of dictionaries. Each dictionary must contain the source and target node identifiers.

  • batch_size (int) – The number of items to process per transaction. Defaults to 10000.

  • **kwargs – Additional keyword arguments passed to the Neo4j query. Must include _sub_resource_label and _sub_resource_id for cleanup queries.

Raises:

ValueError – If required kwargs _sub_resource_label or _sub_resource_id are not provided. These are needed for cleanup queries.

Note

  • If dict_list is empty, the function returns early to save processing time.

  • The function automatically ensures that required indexes exist for efficient relationship creation.

cartography.client.core.tx.read_list_of_dicts_tx(tx: Transaction, query: str, **kwargs) List[Dict[str, Any]]

Execute a Neo4j query and return results as a list of dictionaries.

This function converts each record from the query result into a dictionary, making it easy to work with structured data from Neo4j.

Parameters:
  • tx (neo4j.Transaction) – A Neo4j read transaction object.

  • query (str) – A Neo4j query string that returns one or more fields per record.

  • **kwargs – Additional keyword arguments passed to tx.run().

Returns:

A list of dictionaries, where each dictionary represents

one record from the query result. Keys are the field names from the RETURN clause, and values are the corresponding data.

Return type:

List[Dict[str, Any]]

Examples

>>> query = "MATCH (a:TestNode) RETURN a.name AS name, a.age AS age ORDER BY age"
>>> data = neo4j_session.read_transaction(read_list_of_dicts_tx, query)
>>> print(data)
[{'name': 'Lisa', 'age': 8}, {'name': 'Homer', 'age': 39}]
>>> # Easy iteration over structured data
>>> for person in data:
...     print(f"{person['name']} is {person['age']} years old")
Lisa is 8 years old
Homer is 39 years old
cartography.client.core.tx.read_list_of_tuples_tx(tx: Transaction, query: str, **kwargs) List[Tuple[Any, ...]]

Execute a Neo4j query and return results as a list of tuples.

This function converts each record from the query result into a tuple, which is useful for unpacking values during iteration and can be more memory-efficient than dictionaries.

Parameters:
  • tx (neo4j.Transaction) – A Neo4j read transaction object.

  • query (str) – A Neo4j query string that returns one or more fields per record.

  • **kwargs – Additional keyword arguments passed to tx.run().

Returns:

A list of tuples, where each tuple represents one

record from the query result. Values are ordered according to the RETURN clause in the query.

Return type:

List[Tuple[Any, …]]

Examples

>>> query = "MATCH (a:TestNode) RETURN a.name AS name, a.age AS age ORDER BY age"
>>> data = neo4j_session.read_transaction(read_list_of_tuples_tx, query)
>>> print(data)
[('Lisa', 8), ('Homer', 39)]
>>> # Easy unpacking during iteration
>>> for name, age in data:
...     print(f"{name} is {age} years old")
Lisa is 8 years old
Homer is 39 years old

Note

The advantage of this function over read_list_of_dicts_tx() is that tuples allow for easy unpacking during iteration and use less memory than dictionaries.

cartography.client.core.tx.read_list_of_values_tx(tx: Transaction, query: str, **kwargs) List[str | int]

Execute a Neo4j query and return a list of single values.

This function is designed for queries that return a single field per record. It extracts the value from each record and returns them as a list.

Parameters:
  • tx (neo4j.Transaction) – A Neo4j read transaction object.

  • query (str) – A Neo4j query string that returns single values per record. Supported: MATCH (a:TestNode) RETURN a.name ORDER BY a.name Not supported: MATCH (a:TestNode) RETURN a.name, a.age, a.x

  • **kwargs – Additional keyword arguments passed to tx.run().

Returns:

A list of string or integer values from the query results.

Return type:

List[Union[str, int]]

Examples

>>> query = "MATCH (a:TestNode) RETURN a.name ORDER BY a.name"
>>> values = neo4j_session.execute_read(read_list_of_values_tx, query)
>>> print(values)
['Alice', 'Bob', 'Charlie']

Warning

If the query returns multiple fields per record, only the value of the first field will be returned. This is not a supported use case - ensure your query returns only one field per record.

cartography.client.core.tx.read_single_dict_tx(tx: Transaction, query: str, **kwargs) Any

Execute a Neo4j query and return a single dictionary result.

This function is designed for queries that return exactly one record with multiple fields, converting the result into a dictionary.

Parameters:
  • tx (neo4j.Transaction) – A Neo4j read transaction object.

  • query (str) – A Neo4j query string that returns a single record with one or more fields. The query should match exactly one record.

  • **kwargs – Additional keyword arguments passed to tx.run().

Returns:

A dictionary representing the single record from the query result,

or None if no record was found. Keys are field names from the RETURN clause, and values are the corresponding data.

Return type:

Any

Examples

>>> query = '''MATCH (a:TestNode{name: "Homer"}) RETURN a.name AS name, a.age AS age'''
>>> result = neo4j_session.read_transaction(read_single_dict_tx, query)
>>> print(result)
{'name': 'Homer', 'age': 39}

Warning

  • If the query matches multiple records, only the first record will be returned.

  • For multiple records, use read_list_of_dicts_tx() instead.

  • Ensure your query is specific enough to match exactly one record.

cartography.client.core.tx.read_single_value_tx(tx: Transaction, query: str, **kwargs) str | int | None

Execute a Neo4j query and return a single value.

This function is designed for queries that return exactly one value (string, integer, or None). It’s useful for retrieving specific attributes from unique nodes.

Parameters:
  • tx (neo4j.Transaction) – A Neo4j read transaction object.

  • query (str) – A Neo4j query string that returns a single value. The query should match exactly one record with one field.

  • **kwargs – Additional keyword arguments passed to tx.run().

Returns:

The single value returned by the query, or None if

no record was found.

Return type:

Optional[Union[str, int]]

Examples

>>> query = '''MATCH (a:TestNode{name: "Lisa"}) RETURN a.age'''
>>> value = neo4j_session.read_transaction(read_single_value_tx, query)
>>> print(value)
8

Warning

  • If the query matches multiple records, only the first value will be returned.

  • If the query returns complex objects or dictionaries, the behavior is undefined.

  • Ensure your query is specific enough to match exactly one record.

See also

For complex return types, use read_single_dict_tx() or read_list_of_dicts_tx().

cartography.client.core.tx.run_write_query(neo4j_session: Session, query: str, **parameters: Any) None

Execute a write query inside a managed transaction with retry logic.

This function now includes retry logic for: - Network errors (ConnectionResetError) - Service unavailability (ServiceUnavailable, SessionExpired) - Transient database errors (TransientError) - EntityNotFound errors during concurrent operations (specific ClientError) - BufferError with “cannot be re-sized” during concurrent multi-threaded operations

Used by intel modules that run manual transactions (e.g., GCP firewalls, AWS resources).

Parameters:
  • neo4j_session – The Neo4j session

  • query – The Cypher query to execute

  • parameters – Parameters to pass to the query

Returns:

None

cartography.client.core.tx.write_list_of_dicts_tx(tx: Transaction, query: str, **kwargs) None

Execute a Neo4j write query with a list of dictionaries.

This function is designed to work with queries that process batches of data using the UNWIND clause, allowing efficient bulk operations on Neo4j.

Parameters:
  • tx (neo4j.Transaction) – A Neo4j write transaction object.

  • query (str) – A Neo4j write query string that typically uses UNWIND to process the $DictList parameter. The query should contain data manipulation operations like MERGE, CREATE, or SET.

  • **kwargs – Additional keyword arguments passed to the Neo4j query, including the DictList parameter containing the data to process.

Examples

>>> import neo4j
>>> dict_list = [
...     {'id': 1, 'name': 'Alice', 'age': 30},
...     {'id': 2, 'name': 'Bob', 'age': 25}
... ]
>>>
>>> neo4j_session.execute_write(
...     write_list_of_dicts_tx,
...     '''
...     UNWIND $DictList as data
...         MERGE (a:Person{id: data.id})
...         SET
...             a.name = data.name,
...             a.age = data.age,
...             a.updated_at = $timestamp
...     ''',
...     DictList=dict_list,
...     timestamp=datetime.now()
... )

Note

This function is typically used internally by higher-level functions like load_graph_data() rather than being called directly by user code.

QueryBuilder

cartography.graph.querybuilder.build_create_index_queries(node_schema: CartographyNodeSchema) list[str]

Generate queries to create indexes for the given CartographyNodeSchema and all node types attached to it via its relationships.

This function creates Neo4j CREATE INDEX queries for optimal query performance. It handles indexes for the main node schema, its relationships, and any extra labels or properties that require indexing.

Parameters:

node_schema (CartographyNodeSchema) – The Cartography node schema object to create indexes for.

Returns:

A list of CREATE INDEX queries of the form

CREATE INDEX IF NOT EXISTS FOR (n:$TargetNodeLabel) ON (n.$TargetAttribute)

Return type:

List[str]

Examples

>>> node_schema = CartographyNodeSchema(
...     label='AWSUser',
...     properties=Properties(
...         id=PropertyRef('id'),
...         arn=PropertyRef('arn', extra_index=True),
...         name=PropertyRef('name')
...     ),
...     sub_resource_relationship=account_rel,
...     other_relationships=OtherRelationships([role_rel])
... )
>>> queries = build_create_index_queries(node_schema)
>>> # Returns indexes for:
>>> # - AWSUser.id and AWSUser.lastupdated (standard indexes)
>>> # - AWSUser.arn (extra index due to extra_index=True)
>>> # - Target node properties from relationships
>>> # - Extra node labels if present

Note

This function automatically creates indexes for ‘id’ and ‘lastupdated’ fields on all node types, plus any properties marked with extra_index=True. It also indexes target node properties from all relationships.

Generate queries to create indexes for the given CartographyRelSchema and all node types attached to it.

This function creates Neo4j CREATE INDEX queries specifically for matchlink operations, which are used to connect existing nodes in the graph. It creates indexes for both source and target node properties, plus composite relationship indexes.

Parameters:

rel_schema (CartographyRelSchema) – The CartographyRelSchema object to create indexes for.

Returns:

A list of CREATE INDEX queries for source nodes, target nodes, and relationships.

Returns empty list if source_node_matcher is not defined.

Return type:

List[str]

Examples

>>> rel_schema = CartographyRelSchema(
...     source_node_label='User',
...     source_node_matcher=SourceNodeMatcher(id=PropertyRef('user_id')),
...     target_node_label='Role',
...     target_node_matcher=TargetNodeMatcher(name=PropertyRef('role_name')),
...     rel_label='HAS_ROLE',
...     direction=LinkDirection.OUTWARD
... )
>>> queries = build_create_index_queries_for_matchlink(rel_schema)
>>> # Returns:
>>> # - CREATE INDEX FOR (n:User) ON (n.id)
>>> # - CREATE INDEX FOR (n:Role) ON (n.name)
>>> # - CREATE INDEX FOR ()-[r:HAS_ROLE]->() ON (r.lastupdated, r._sub_resource_label, r._sub_resource_id)
>>> # Missing source node matcher
>>> incomplete_rel = CartographyRelSchema(target_node_label='Role', ...)
>>> queries = build_create_index_queries_for_matchlink(incomplete_rel)
>>> # Returns: [] (empty list with warning logged)

Note

This function is only used for load_matchlinks() where we match and connect existing nodes in the graph. It requires source_node_matcher to be defined and creates composite indexes for relationship performance.

cartography.graph.querybuilder.build_ingestion_query(node_schema: CartographyNodeSchema, selected_relationships: set[CartographyRelSchema] | None = None) str

Generate a Neo4j query from a CartographyNodeSchema to ingest nodes and relationships.

This function creates an optimized Neo4j query that cartography module authors can use instead of handwriting their own queries. It handles node creation, property setting, and relationship attachment in a single optimized query.

Parameters:
  • node_schema (CartographyNodeSchema) – The CartographyNodeSchema object to build a Neo4j query from.

  • selected_relationships (Optional[Set[CartographyRelSchema]], optional) – If specified, generates a query that attaches only the relationships in this set. The RelSchema specified must be present in node_schema.sub_resource_relationship or node_schema.other_relationships. Defaults to None (uses all relationships). If empty set, creates query with no relationships.

Returns:

An optimized Neo4j query that can be used to ingest nodes and relationships.

Return type:

str

Examples

>>> # Basic node schema with relationships
>>> node_schema = CartographyNodeSchema(
...     label='EC2Instance',
...     properties=EC2InstanceProperties(),
...     sub_resource_relationship=account_rel,
...     other_relationships=OtherRelationships([vpc_rel, subnet_rel])
... )
>>> query = build_ingestion_query(node_schema)
>>> # Returns complete ingestion query with all relationships
>>> # Query with selected relationships only
>>> selected_rels = {account_rel, vpc_rel}
>>> query = build_ingestion_query(node_schema, selected_rels)
>>> # Returns query with only account and VPC relationships
>>> # Query with no relationships
>>> query = build_ingestion_query(node_schema, set())
>>> # Returns query that only creates nodes, no relationships

Note

  • The resulting query uses the UNWIND + MERGE pattern for batch loading data efficiently

  • The query assumes a list of dicts will be passed via parameter $DictList

  • The query sets firstseen attributes on all created nodes and relationships

  • The query is intended for use with cartography.core.client.tx.load_graph_data()

Generate a Neo4j query to link two existing nodes when given a CartographyRelSchema object.

This function creates a Neo4j query specifically for connecting existing nodes in the graph based on their properties. It is designed for use with load_matchlinks() operations.

Parameters:

rel_schema (CartographyRelSchema) – The CartographyRelSchema object to generate a query for. This object must have: - source_node_matcher and source_node_label defined - CartographyRelProperties with _sub_resource_label and _sub_resource_id defined

Returns:

A Neo4j query that can be used to link two existing nodes.

Return type:

str

Raises:

ValueError – If the rel_schema does not have a source_node_matcher or source_node_label defined, or if the rel_schema properties do not have _sub_resource_label or _sub_resource_id defined.

Examples

>>> rel_schema = CartographyRelSchema(
...     source_node_label='User',
...     source_node_matcher=SourceNodeMatcher(id=PropertyRef('user_id')),
...     target_node_label='Role',
...     target_node_matcher=TargetNodeMatcher(name=PropertyRef('role_name')),
...     rel_label='HAS_ROLE',
...     direction=LinkDirection.OUTWARD,
...     properties=UserRoleRel(
...         _sub_resource_label=PropertyRef('_sub_resource_label', set_in_kwargs=True),
...         _sub_resource_id=PropertyRef('_sub_resource_id', set_in_kwargs=True)
...     )
... )
>>> query = build_matchlink_query(rel_schema)
>>> # Returns:
>>> # UNWIND $DictList as item
>>> #     MATCH (from:User{id: item.user_id})
>>> #     MATCH (to:Role{name: item.role_name})
>>> #     MERGE (from)-[r:HAS_ROLE]->(to)
>>> #     ON CREATE SET r.firstseen = timestamp()
>>> #     SET r._sub_resource_label = $_sub_resource_label, ...

Note

This function is only used for load_matchlinks() operations where we need to connect existing nodes. The _sub_resource_label and _sub_resource_id properties are required for the cleanup query functionality.

cartography.graph.querybuilder.filter_selected_relationships(node_schema: CartographyNodeSchema, selected_relationships: set[CartographyRelSchema]) tuple[CartographyRelSchema | None, OtherRelationships | None]

Filter and validate selected relationships against a node schema.

This function ensures that selected relationships specified to build_ingestion_query() are actually present on the node schema. It validates the relationships exist and separates them into sub resource and other relationship categories.

Parameters:
  • node_schema (CartographyNodeSchema) – The node schema object to filter relationships against.

  • selected_relationships (Set[CartographyRelSchema]) – The set of relationships to check if they exist in the node schema. If empty set, this means no relationships have been selected. None is not an accepted value.

Returns:

A tuple containing:
  • Sub resource relationship (if present in selected_relationships)

  • OtherRelationships object containing all other relationships from selected_relationships that are present in the node schema

Return type:

Tuple[Optional[CartographyRelSchema], Optional[OtherRelationships]]

Raises:

ValueError – If any selected relationship is not defined on the node schema.

Examples

>>> node_schema = CartographyNodeSchema(
...     label='EC2Instance',
...     sub_resource_relationship=account_rel,
...     other_relationships=OtherRelationships([vpc_rel, subnet_rel])
... )
>>>
>>> # Select subset of relationships
>>> selected = {account_rel, vpc_rel}
>>> sub_rel, other_rels = filter_selected_relationships(node_schema, selected)
>>> # Returns: (account_rel, OtherRelationships([vpc_rel]))
>>> # Empty set means no relationships selected
>>> sub_rel, other_rels = filter_selected_relationships(node_schema, set())
>>> # Returns: (None, None)
>>> # Invalid relationship raises error
>>> invalid_selected = {unknown_rel}
>>> filter_selected_relationships(node_schema, invalid_selected)
>>> # Raises: ValueError

Note

This function is used internally by build_ingestion_query() to validate and filter the selected_relationships parameter.

cartography.graph.querybuilder.rel_present_on_node_schema(node_schema: CartographyNodeSchema, rel_schema: CartographyRelSchema) bool

Check if a relationship schema is present on a node schema.

This function determines whether a given relationship schema is defined on the provided node schema, checking both sub resource relationships and other relationships.

Parameters:
Returns:

True if the relationship schema is present on the node schema, False otherwise.

Return type:

bool

Examples

>>> node_schema = CartographyNodeSchema(
...     label='AWSUser',
...     sub_resource_relationship=account_rel,
...     other_relationships=OtherRelationships([role_rel, group_rel])
... )
>>> rel_present_on_node_schema(node_schema, account_rel)
True
>>> rel_present_on_node_schema(node_schema, role_rel)
True
>>> rel_present_on_node_schema(node_schema, unknown_rel)
False

Note

This function is commonly used for validation in cleanup operations and query building to ensure that only valid relationships are processed.

CleanupBuilder

cartography.graph.cleanupbuilder.build_cleanup_queries(node_schema: CartographyNodeSchema, cascade_delete: bool = False) List[str]

Generate Neo4j queries to clean up stale nodes and relationships.

This function creates appropriate cleanup queries based on the node schema’s configuration, handling different scenarios for scoped and unscoped cleanup operations.

Parameters:
  • node_schema (CartographyNodeSchema) – The node schema object defining the structure and cleanup behavior for the target nodes.

  • cascade_delete (bool) – If True, also delete all child nodes that have a relationship to stale nodes matching node_schema.sub_resource_relationship.rel_label. Defaults to False to preserve existing behavior. Only valid when scoped_cleanup=True.

Returns:

A list of Neo4j queries to clean up stale nodes and relationships.

Returns an empty list if the node has no relationships (e.g., SyncMetadata nodes).

Return type:

List[str]

Raises:

ValueError – If the node schema has a sub resource relationship but scoped_cleanup is False, which creates an inconsistent configuration.

Note

The function handles four distinct cases:

  1. Standard scoped cleanup: Node has sub resource + scoped cleanup = True → Clean up stale nodes scoped to the sub resource

  2. Invalid configuration: Node has sub resource + scoped cleanup = False → Raises ValueError (inconsistent state)

  3. Relationship-only cleanup: No sub resource + scoped cleanup = True → Clean up only stale relationships, preserve nodes

  4. Unscoped cleanup: No sub resource + scoped cleanup = False → Clean up all stale nodes regardless of scope

Nodes without relationships (like SyncMetadata) are left for manual management.

Generate a cleanup query for matchlink relationships.

This function creates a Neo4j query to clean up stale matchlink relationships that are scoped to specific sub resources. It’s used to maintain data consistency by removing outdated relationship connections.

Parameters:

rel_schema (CartographyRelSchema) –

The relationship schema object to generate a query for. Must have the following requirements:

  • source_node_matcher and source_node_label defined

  • CartographyRelProperties object with _sub_resource_label and _sub_resource_id defined

Returns:

A Neo4j query to clean up stale matchlink relationships, scoped to

the specified sub resource.

Return type:

str

Raises:

ValueError – If the rel_schema does not have a source_node_matcher defined.

Note

  • The query includes scoping clauses to ensure only relationships associated with the specified sub resource are cleaned up.

  • Relationship direction is automatically determined from the schema configuration.

  • The query uses parameterized values for security and consistency.

GraphJob

class cartography.graph.job.GraphJob(name: str, statements: List[GraphStatement], short_name: str | None = None)

Bases: object

A job that executes a sequence of Neo4j statements against the cartography graph.

A GraphJob represents a complete unit of work that consists of one or more GraphStatements executed sequentially. Jobs are commonly used for data loading, cleanup operations, and complex graph transformations.

Parameters:
  • name (str) – Human-readable name for the job (e.g., “AWS EC2 instance cleanup”).

  • statements (List[GraphStatement]) – List of GraphStatement objects to execute in sequence.

  • short_name (Optional[str]) – Short identifier/slug for the job (e.g., “ec2_cleanup”). If not provided, logging will use the full name.

name

The human-readable job name.

Type:

str

statements

The list of statements to execute.

Type:

List[GraphStatement]

short_name

The job’s short identifier for logging.

Type:

Optional[str]

Examples

Creating a simple job:

>>> from cartography.graph.statement import GraphStatement
>>> stmt1 = GraphStatement("MATCH (n:TestNode) SET n.processed = true")
>>> stmt2 = GraphStatement("MATCH (n:TestNode) WHERE n.stale = true DELETE n")
>>> job = GraphJob("Test cleanup job", [stmt1, stmt2], "test_cleanup")
>>> job.name
'Test cleanup job'
>>> len(job.statements)
2

Creating a job from a node schema:

>>> parameters = {"UPDATE_TAG": 1642784400, "account_id": "123456789012"}
>>> job = GraphJob.from_node_schema(aws_ec2_schema, parameters)
>>> job.name
'Cleanup AWSEC2Instance'

Note

  • Statements are executed in the order they appear in the list.

  • If any statement fails, the entire job fails and subsequent statements are not executed.

  • Jobs automatically handle parameter merging and provide structured logging.

__init__(name: str, statements: List[GraphStatement], short_name: str | None = None)

Initialize a new GraphJob instance.

Parameters:
  • name (str) – Human-readable name for the job.

  • statements (List[GraphStatement]) – List of statements to execute sequentially.

  • short_name (Optional[str]) – Short identifier for logging purposes.

as_dict() Dict

Convert the job to a dictionary representation.

This method serializes the job to a dictionary format suitable for JSON serialization or other forms of persistence.

Returns:

A dictionary containing the job’s name, statements, and short_name.

The statements are also converted to their dictionary representations.

Return type:

Dict

Examples

>>> job = GraphJob("Test job", [statement], "test_job")
>>> job_dict = job.as_dict()
>>> job_dict.keys()
dict_keys(['name', 'statements', 'short_name'])
>>> job_dict['name']
'Test job'
classmethod from_json(blob: str | dict, short_name: str | None = None) GraphJob

Create a GraphJob instance from a JSON string.

This class method deserializes a JSON string into a GraphJob object, reconstructing all statements and their parameters.

Parameters:
  • blob (Union[str, dict]) – The JSON string or dictionary to deserialize. Must contain a ‘name’ field and a ‘statements’ field with a list of statement definitions. If a string is provided, it will be parsed as JSON. If a dictionary is provided, it will be used directly.

  • short_name (Optional[str]) – Override the short name for the job. If not provided, uses the short_name from JSON if available.

Returns:

A new GraphJob instance created from the JSON data.

Return type:

GraphJob

Raises:
  • json.JSONDecodeError – If the blob is not valid JSON.

  • KeyError – If the JSON doesn’t contain required fields (‘name’, ‘statements’).

Examples

>>> json_str = '''
... {
...     "name": "Test Job",
...     "statements": [
...         {
...             "query": "MATCH (n:TestNode) RETURN count(n)",
...             "parameters": {}
...         }
...     ]
... }
... '''
>>> job = GraphJob.from_json(json_str, "test_job")
>>> job.name
'Test Job'
>>> job.short_name
'test_job'
classmethod from_json_file(file_path: str | Path) GraphJob

Create a GraphJob instance from a JSON file.

This class method reads a JSON file and deserializes it into a GraphJob object. The job’s short name is automatically derived from the file path.

Parameters:

file_path (Union[str, Path]) – The path to the JSON file to read and parse.

Returns:

A new GraphJob instance created from the JSON file.

Return type:

GraphJob

Raises:
  • FileNotFoundError – If the specified file does not exist.

  • json.JSONDecodeError – If the file contains invalid JSON.

  • KeyError – If the JSON file doesn’t contain required fields (‘name’, ‘statements’).

Create a cleanup job for matchlink relationships.

This class method generates a cleanup job specifically for cleaning up stale relationships created by load_matchlinks() operations. It should only be used for matchlink cleanup and not for general relationship cleanup.

Parameters:
  • rel_schema (CartographyRelSchema) – The relationship schema object defining the structure of relationships to clean up. Must have source_node_matcher and target_node_matcher defined.

  • sub_resource_label (str) – The label of the sub-resource to scope cleanup to.

  • sub_resource_id (str) – The ID of the sub-resource to scope cleanup to.

  • update_tag (int) – The update tag to identify stale relationships.

  • iterationsize (int, optional) – The number of items to process in each iteration. Defaults to 100.

Returns:

A new GraphJob instance configured for matchlink cleanup.

Return type:

GraphJob

Note

  • This method is specifically designed for matchlink cleanup operations.

  • Required relationship properties _sub_resource_label and _sub_resource_id must be defined in the rel_schema.

  • For a given rel_schema, the fields used in the rel_schema.properties._sub_resource_label.name and

rel_schema.properties._sub_resource_id.name must be provided as keys and values in the params dict. - The rel_schema must have a source_node_matcher and target_node_matcher. - The number of items to process in each iteration. Defaults to 100.

classmethod from_node_schema(node_schema: CartographyNodeSchema, parameters: Dict[str, Any], iterationsize: int = 100, cascade_delete: bool = False) GraphJob

Create a cleanup job from a CartographyNodeSchema.

This class method generates a cleanup job that removes stale nodes and relationships based on the provided node schema configuration. It automatically creates the necessary cleanup queries and validates that all required parameters are provided.

For a given node, the fields used in the node_schema.sub_resource_relationship.target_node_matcher.keys() must be provided as keys and values in the parameters dict.

Parameters:
  • node_schema (CartographyNodeSchema) – The node schema object defining the structure and relationships of nodes to clean up.

  • parameters (Dict[str, Any]) – Parameters for the cleanup queries. Must include all parameters required by the generated queries. Common parameters include UPDATE_TAG and sub-resource identifiers.

  • iterationsize (int, optional) – The number of items to process in each iteration. Defaults to 100.

  • cascade_delete (bool) – If True, also delete all child nodes that have a relationship to stale nodes matching node_schema.sub_resource_relationship.rel_label. Defaults to False to preserve existing behavior.

Returns:

A new GraphJob instance configured for cleanup operations.

Return type:

GraphJob

Raises:

ValueError – If the provided parameters don’t match the expected parameters for the cleanup queries.

merge_parameters(parameters: Dict) None

Merge parameters into all job statements.

This method distributes the provided parameters to all statements in the job, allowing for centralized parameter management at the job level.

Parameters:

parameters (Dict) – Dictionary of parameters to merge into all statements. Keys should match the parameter names used in the Neo4j queries.

Examples

>>> job = GraphJob("Test job", [statement1, statement2])
>>> job.merge_parameters({"UPDATE_TAG": 1642784400, "account_id": "123456789012"})
>>> # All statements in the job now have access to these parameters
run(neo4j_session: Session) None

Execute the job by running all statements sequentially.

This method executes each statement in the job in order. If any statement fails, the entire job fails and subsequent statements are not executed. Progress is logged at debug and info levels.

Parameters:

neo4j_session (neo4j.Session) – The Neo4j session to use for execution.

Raises:

Exception – Any exception raised by a statement execution is re-raised after logging the error.

Examples

>>> job = GraphJob("Cleanup job", [cleanup_statement])
>>> job.run(neo4j_session)
# Logs: "Starting job 'Cleanup job'."
# Logs: "Finished job cleanup_job" (if short_name is set)
classmethod run_from_json(neo4j_session: Session, blob: str | dict, parameters: Dict, short_name: str | None = None) None

Create and execute a job from a JSON string.

This convenience method combines job creation and execution in a single call. It deserializes the JSON, merges parameters, and executes all statements.

Parameters:
  • neo4j_session (neo4j.Session) – The Neo4j session to use for execution.

  • blob (Union[str, dict]) – The JSON string or dictionary containing the job definition. If a string is provided, it will be parsed as JSON. If a dictionary is provided, it will be used directly.

  • parameters (Dict) – Parameters to merge into all job statements.

  • short_name (Optional[str]) – Override the short name for the job.

classmethod run_from_json_file(file_path: str | Path, neo4j_session: Session, parameters: Dict) None

Create and execute a job from a JSON file.

This convenience method combines job creation from file and execution in a single call. It reads the JSON file, merges parameters, and executes all statements.

Parameters:
  • file_path (Union[str, Path]) – The path to the JSON file containing the job definition.

  • neo4j_session (neo4j.Session) – The Neo4j session to use for execution.

  • parameters (Dict) – Parameters to merge into all job statements.

Raises:
  • FileNotFoundError – If the specified file does not exist.

  • json.JSONDecodeError – If the file contains invalid JSON.

class cartography.graph.job.GraphJobJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)

Bases: JSONEncoder

JSON encoder with support for GraphJob instances.

This custom JSON encoder extends the default JSONEncoder to handle serialization of GraphJob objects. It automatically converts GraphJob instances to their dictionary representation using the as_dict() method.

Note

This encoder only handles GraphJob instances. For other custom objects, it falls back to the default JSONEncoder behavior, which may raise a TypeError for non-serializable objects.

default(obj: Any) Any

Handle serialization of custom objects.

This method is called by the JSON encoder for objects that are not natively serializable. It converts GraphJob instances to dictionaries and delegates other objects to the default encoder.

Parameters:

obj (Any) – The object to be serialized.

Returns:

The serializable representation of the object.

Return type:

Any

Raises:

TypeError – If the object is not a GraphJob and cannot be serialized by the default encoder.

cartography.graph.job.get_parameters(queries: List[str]) Set[str]

Extract all parameters from a list of Neo4j queries.

This function analyzes all given Neo4j queries and extracts all parameter identifiers that start with ‘$’. It’s useful for validating that all required parameters are provided before executing a job.

Parameters:

queries (List[str]) – A list of Neo4j queries with parameters indicated by leading ‘$’ (e.g., $node_id, $update_tag).

Returns:

The set of all unique parameters across all given Neo4j queries.

Return type:

Set[str]

Note

This function is commonly used in GraphJob.from_node_schema() to validate that all required parameters are provided for cleanup queries.

GraphStatement

class cartography.graph.statement.GraphStatement(query: str, parameters: Dict[Any, Any] | None = None, iterative: bool = False, iterationsize: int = 0, parent_job_name: str | None = None, parent_job_sequence_num: int | None = None)

Bases: object

A statement that will run against the cartography graph. Statements can query or update the graph.

This class encapsulates a Neo4j Cypher query along with its parameters and execution configuration. It supports both regular and iterative execution modes, and provides comprehensive logging and statistics tracking.

query

The Cypher query to run.

Type:

str

parameters

The parameters to use for the query.

Type:

Dict[Any, Any]

iterative

Whether the statement is iterative. If True, the statement will be run in chunks of size iterationsize until no more records are returned.

Type:

bool

iterationsize

The size of each chunk for iterative statements.

Type:

int

parent_job_name

The name of the parent job this statement belongs to.

Type:

Optional[str]

parent_job_sequence_num

The sequence number of this statement within the parent job. This is used for logging and tracking purposes.

Type:

Optional[int]

Examples

>>> # Simple query statement
>>> stmt = GraphStatement("MATCH (n:User) RETURN count(n)")
>>> stmt.run(session)
>>> # Statement with parameters
>>> stmt = GraphStatement(
...     "MATCH (n:User) WHERE n.name = $name RETURN n",
...     parameters={"name": "Alice"}
... )
>>> # Iterative statement for large datasets
>>> stmt = GraphStatement(
...     "MATCH (n:User) SET n.processed = true LIMIT $LIMIT_SIZE",
...     iterative=True,
...     iterationsize=1000
... )

Note

For iterative statements, the query should include a LIMIT clause using the $LIMIT_SIZE parameter, which is automatically set to the iterationsize value.

__init__(query: str, parameters: Dict[Any, Any] | None = None, iterative: bool = False, iterationsize: int = 0, parent_job_name: str | None = None, parent_job_sequence_num: int | None = None)

Initialize a GraphStatement instance.

Parameters:
  • query (str) – The Cypher query to execute.

  • parameters (Optional[Dict[Any, Any]], optional) – Parameters to pass to the query. Defaults to None (empty dict).

  • iterative (bool, optional) – Whether to run the statement iteratively in chunks. Defaults to False.

  • iterationsize (int, optional) – Size of each chunk for iterative execution. Defaults to 0. Must be > 0 if iterative=True.

  • parent_job_name (Optional[str], optional) – Name of the parent job for logging. Defaults to None.

  • parent_job_sequence_num (Optional[int], optional) – Sequence number within the parent job. Defaults to None (will be set to 1).

as_dict() Dict[str, Any]

Convert statement to a dictionary representation.

This method serializes the GraphStatement instance into a dictionary containing all the essential information needed to recreate the statement.

Returns:

A dictionary representation of the statement containing:
  • query: The Cypher query string

  • parameters: The query parameters

  • iterative: Boolean indicating if the statement is iterative

  • iterationsize: The chunk size for iterative execution

Return type:

Dict[str, Any]

Examples

>>> stmt = GraphStatement(
...     "MATCH (n) RETURN count(n)",
...     parameters={"limit": 10},
...     iterative=True,
...     iterationsize=1000
... )
>>> stmt_dict = stmt.as_dict()
>>> # Returns:
>>> # {
>>> #     "query": "MATCH (n) RETURN count(n)",
>>> #     "parameters": {"limit": 10, "LIMIT_SIZE": 1000},
>>> #     "iterative": True,
>>> #     "iterationsize": 1000
>>> # }

Note

This method is used by GraphStatementJSONEncoder for JSON serialization. Parent job information is not included in the dictionary representation.

classmethod create_from_json(json_obj: Dict[str, Any], short_job_name: str | None = None, job_sequence_num: int | None = None)

Create a GraphStatement instance from a JSON object.

This class method constructs a GraphStatement from a dictionary representation, typically loaded from a JSON file. It provides a convenient way to deserialize stored statement configurations.

Parameters:
  • json_obj (Dict[str, Any]) – The JSON object containing statement configuration. Expected keys: “query”, “parameters”, “iterative”, “iterationsize”.

  • short_job_name (Optional[str], optional) – The short name of the job this statement belongs to, used for logging and naming. Defaults to None.

  • job_sequence_num (Optional[int], optional) – The sequence number of this statement within the job, used for logging and tracking. Defaults to None.

Returns:

A new GraphStatement instance created from the JSON object.

Return type:

GraphStatement

Note

Missing keys in the JSON object will use default values: - query: “” (empty string) - parameters: {} (empty dict) - iterative: False - iterationsize: 0

classmethod create_from_json_file(file_path: Path)

Create a GraphStatement instance from a JSON file.

This class method reads a JSON file and creates a GraphStatement instance from its contents. It’s a convenient way to load statement configurations from external files.

Parameters:

file_path (Path) – The path to the JSON file to read.

Returns:

A new GraphStatement instance created from the JSON file.

Return type:

GraphStatement

Raises:
  • FileNotFoundError – If the specified file does not exist.

  • json.JSONDecodeError – If the file contains invalid JSON.

  • PermissionError – If the file cannot be read due to permissions.

Note

The job short name is automatically extracted from the filename (without extension) using the get_job_shortname() utility function. The sequence number defaults to None.

merge_parameters(parameters: Dict) None

Merge given parameters with existing parameters.

This method updates the statement’s parameters by merging the provided parameters with the existing ones. New parameters will override existing ones with the same key.

Parameters:

parameters (Dict) – The parameters to merge into the existing parameters.

Examples

>>> stmt = GraphStatement("MATCH (n) WHERE n.id = $id RETURN n", {"id": 1})
>>> stmt.merge_parameters({"limit": 10, "id": 2})
>>> # stmt.parameters is now {"id": 2, "limit": 10, "LIMIT_SIZE": 0}

Note

This method creates a copy of the existing parameters before merging to avoid modifying the original dictionary reference.

run(session: Session) None

Run the statement. This will execute the query against the graph.

This method executes the Cypher query using either iterative or non-iterative execution based on the statement configuration. It handles logging and statistics collection automatically.

Parameters:

session (neo4j.Session) – The Neo4j session to use for executing the query.

Examples

>>> stmt = GraphStatement("MATCH (n:User) RETURN count(n)")
>>> stmt.run(session)
>>> # Query executed and results processed
>>> # Iterative execution
>>> stmt = GraphStatement(
...     "MATCH (n:User) SET n.processed = true LIMIT $LIMIT_SIZE",
...     iterative=True,
...     iterationsize=1000
... )
>>> stmt.run(session)
>>> # Query executed in chunks of 1000 until no more updates

Note

For iterative statements, the method will continue running until the query returns no updates (summary.counters.contains_updates is False). Completion is logged with the parent job name and sequence number.

class cartography.graph.statement.GraphStatementJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)

Bases: JSONEncoder

Support JSON serialization for GraphStatement instances.

This custom JSON encoder extends the default JSONEncoder to handle GraphStatement objects by converting them to dictionaries using their as_dict() method.

Note

This encoder only handles GraphStatement objects. All other objects are passed to the default JSONEncoder, which may raise TypeError for non-serializable objects.

default(obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
cartography.graph.statement.get_job_shortname(file_path: Path | str) str

Extract the short name from a file path by removing the extension.

This utility function takes a file path and returns the path without the file extension. Note that the directory path is preserved.

Parameters:

file_path (Union[Path, str]) – The file path to process, can be a Path object or string.

Returns:

The file path without extension.

Return type:

str

Examples

>>> get_job_shortname("/path/to/my_job.json")
'/path/to/my_job'
>>> get_job_shortname("config.yaml")
'config'
>>> get_job_shortname(Path("/jobs/data_sync.py"))
'/jobs/data_sync'

Note

This function is planned to be moved to cartography.util after refactoring of the run_*_job functions to cartography.graph.job.

Context

Context management for synchronization jobs.

This module provides context objects with well-documented fields that can be passed between different parts of the sync job and used to get parameters for job queries. The context system allows for clean parameter passing and enables module-specific context extensions through subclassing.

The general workflow is: 1. Create a base Context with common parameters (like update_tag) 2. Pass it to sync modules that need access to these parameters 3. Optionally extend it with module-specific context using subclassing

Note

The context pattern promotes clean separation of concerns by allowing each module to access only the parameters it needs while maintaining a consistent interface for parameter passing throughout the sync pipeline.

class cartography.graph.context.Context(update_tag)

Bases: object

Base context class for synchronization job parameters.

This class provides a standardized way to pass parameters between different parts of the sync job. It can be subclassed to create module-specific contexts that extend the base functionality with additional fields.

update_tag

A timestamp or identifier used to track when data was last updated. This is typically used in Neo4j queries to identify and clean up stale data.

Examples

Basic usage:

>>> ctx = Context(update_tag=1642784400)
>>> print(ctx.update_tag)
1642784400

Creating a module-specific context:

>>> class DatabaseContext(Context):
...     def __init__(self, update_tag, db_host, db_port):
...         super().__init__(update_tag)
...         self.db_host = db_host
...         self.db_port = db_port
...
...     @classmethod
...     def from_context(cls, context, db_host, db_port):
...         return cls(context.update_tag, db_host, db_port)
>>> base_ctx = Context(update_tag=1642784400)
>>> db_ctx = DatabaseContext.from_context(base_ctx, "localhost", 5432)
>>> print(db_ctx.update_tag, db_ctx.db_host, db_ctx.db_port)
1642784400 localhost 5432

Note

Subclasses should implement a from_context class method to enable easy conversion from a base context to the specialized context. This pattern maintains consistency across different context types.

__init__(update_tag)

Initialize a new Context instance.

Parameters:

update_tag – A timestamp or identifier used to track data freshness. This is typically used in cleanup queries to identify stale data.