DriftDetect

CLI

cartography.driftdetect.cli.configure_get_state_neo4j(config)

Extra configuration options for neo4j interaction.

Parameters:

config (Config Object)

Return type:

Config Object

Returns:

The config object.

cartography.driftdetect.cli.main(argv=None)

Entrypoint for the default cartography command line interface.

Return type:

int

Returns:

The return code.

Config

class cartography.driftdetect.config.AddShortcutConfig(query_directory: str, shortcut: str, filename: str)

Bases: object

A common interface for the drift-detection add-shortcut configuration.

All fields defined on this class must be present on a configuration object. Fields documented as required must contain valid values.

Parameters:
  • query_directory (string) – Path to query directory. Required.

  • shortcut (string) – Name of shortcut to access the file. Required.

  • filename (string) – Filename (without the directory prefix) of the state to be shortcut. Required.

__init__(query_directory: str, shortcut: str, filename: str)
class cartography.driftdetect.config.GetDriftConfig(query_directory: str, start_state: str, end_state: str)

Bases: object

A common interface for the drift-detection get-drift configuration.

All fields defined on this class must be present on a configuration object. Fields documented as required must contain valid values.

Parameters:
  • query_directory (string) – Path to query directory. Required.

  • start_state (string) – Filename (without the directory prefix) of the earlier state to be compared with. Required.

  • end_state (string) – Filename (without the directory prefix) of the later state to be compared with. Required.

__init__(query_directory: str, start_state: str, end_state: str)
class cartography.driftdetect.config.UpdateConfig(drift_detection_directory: str, neo4j_uri: str, neo4j_user: str | None = None, neo4j_password: str | None = None)

Bases: object

A common interface for the drift-detection update configuration.

All fields defined on this class must be present on a configuration object. Fields documented as required must contain valid values. Fields documented as optional may contain None, in which case drift-detection will choose a sensible default value for that piece of configuration.

Parameters:
  • drift_detection_directory (string) – Path to drift detection directory. Required.

  • neo4j_uri (string) – URI for a Neo4j graph database service. Required.

  • neo4j_user (string) – User name for a Neo4j graph database service. Optional.

  • neo4j_password (string) – Password for a Neo4j graph database service. Optional.

__init__(drift_detection_directory: str, neo4j_uri: str, neo4j_user: str | None = None, neo4j_password: str | None = None)

Util

cartography.driftdetect.util.valid_directory(directory)

Error handling for validating directory.

Parameters:

directory (string) – Path to directory.

Returns:

States

class cartography.driftdetect.model.State(name: str, validation_query: str, properties: List[str], results: List[List[str]])

Bases: object

The default object which stores query information.

Parameters:
  • name (String) – Name of the query.

  • validation_query (String) – Actual Cypher query being run.

  • properties (List of Strings) – List of keys in order that the cypher query will return.

  • results (List of List of Strings) – List of all results of running the validation query

__init__(name: str, validation_query: str, properties: List[str], results: List[List[str]])
cartography.driftdetect.get_states.get_query_state(session: Session, query_directory: str, state_serializer: StateSchema, storage, filename: str) State

Gets the most recent state of a query.

Parameters:
  • session (neo4j session.) – neo4j session to connect to.

  • query_directory (String.) – Path to query directory.

  • state_serializer (Schema) – Schema to serialize and deserialize states.

  • storage (Storage Object.) – Storage object to supports loading, writing, and walking.

  • filename (String.) – Path to filename.

Returns:

The created state.

cartography.driftdetect.get_states.get_state(session: Session, state: State) None

Connects to a neo4j session, runs the validation query, then saves the results to a state.

Parameters:
  • session (neo4j session) – Graph session to pull infrastructure information from.

  • state (State) – State to be updated.

Returns:

cartography.driftdetect.get_states.run_get_states(config: UpdateConfig) None

Handles neo4j errors and then updates detectors.

Parameters:

config (Config Object) – Config Object from CLI

Returns:

Storage

class cartography.driftdetect.storage.FileSystem

Bases: object

classmethod has_file(filename)

Determines whether or not file exists. :type filename: string :param filename: filepath :return: Bool

classmethod load(file_path)

Loads a JSON object (dict) from a file. :type file_path: string. :param file_path: Filepath for the file. :return: Dictionary in JSON format.

classmethod walk(drift_detection_directory)

Enables walking through drift detection. :type drift_detection_directory: String. :param drift_detection_directory: Path to drift detection directory. :yield: query directory.

classmethod write(data, file_path)

Writes a JSON object (dict) to a file. :type data: Dict :param data: Dictionary in JSON format. :type file_path: string :param file_path: Filepath to be written to. :return:

Shortcut

class cartography.driftdetect.shortcut.Shortcut(name, shortcuts)

Bases: object

Interface for ReportInfo Class.

Parameters:
  • name (String) – Name of query

  • shortcuts (Dictionary) – Dictionary of Shortcuts to Filenames

__init__(name, shortcuts)
cartography.driftdetect.add_shortcut.add_shortcut(storage, shortcut_serializer, query_directory, alias, filename)

Adds a shortcut to the Report_Info File. If a shortcut already exists for an alias, it replaces that shortcut.

Parameters:
  • storage (Storage Object.) – Type of Storage System.

  • shortcut_serializer (Shortcut Schema.) – Shortcut Serializer. Should serialize and deserialize between JSON and Shortcut Object

  • query_directory (String.) – Path to Query Directory.

  • alias (String.) – Alias for the file.

  • filename (String.) – Name of file or shortcut to that file.

Returns:

cartography.driftdetect.add_shortcut.run_add_shortcut(config)

Runs add_shortcut from the command line. Does error handling.

Parameters:

config (Config Object) – Config of adding shortcut

Returns:

Reporter

cartography.driftdetect.reporter.report_drift(new_results, missing_results, state_name, state_properties)

Prints the results between two states. :param new_results: List of new results. :param missing_results: List of missing results. :param state_name: Query Name. :param state_properties: Query Properties. :return: None.

cartography.driftdetect.reporter.report_drift_missing(results, state_properties)

Prints missing results in Query Results between two states.

Parameters:

results (List of List of Strings.) – Deviation information.

Returns:

None

cartography.driftdetect.reporter.report_drift_new(results, state_properties)

Prints new additions in Query Results between two states.

Parameters:

results (List of List of Strings.) – Deviation information.

Returns:

None

Serializers

class cartography.driftdetect.serializers.ShortcutSchema(*, only: Sequence[str] | AbstractSet[str] | None = None, exclude: Sequence[str] | AbstractSet[str] = (), many: bool | None = None, load_only: Sequence[str] | AbstractSet[str] = (), dump_only: Sequence[str] | AbstractSet[str] = (), partial: bool | Sequence[str] | AbstractSet[str] | None = None, unknown: Literal['exclude', 'include', 'raise'] | None = None)

Bases: Schema

Schema to serialize and deserialize Shortcuts from JSON.

make_misc(data, **kwargs)
opts: Any = <marshmallow.schema.SchemaOpts object>
class cartography.driftdetect.serializers.StateSchema(*, only: Sequence[str] | AbstractSet[str] | None = None, exclude: Sequence[str] | AbstractSet[str] = (), many: bool | None = None, load_only: Sequence[str] | AbstractSet[str] = (), dump_only: Sequence[str] | AbstractSet[str] = (), partial: bool | Sequence[str] | AbstractSet[str] | None = None, unknown: Literal['exclude', 'include', 'raise'] | None = None)

Bases: Schema

Schema to serialize and deserialize DriftStates from JSON.

make_state(data, **kwargs)
opts: Any = <marshmallow.schema.SchemaOpts object>

Detect deviations

cartography.driftdetect.detect_deviations.compare_states(start_state: State, end_state: State)

Helper function for comparing differences between two States.

Parameters:
  • start_state (State) – The earlier state chronologically to be compared to.

  • end_state (State) – The later state chronologically to be compared to.

Returns:

list of tuples of differences between states in the form (dictionary, State object)

cartography.driftdetect.detect_deviations.perform_drift_detection(start_state: State, end_state: State)

Returns differences (additions and missing results) between two States.

Parameters:
  • start_state (State) – The earlier state chronologically to be compared to.

  • end_state (State) – The later state chronologically to be compared to.

Returns:

tuple of additions and subtractions between the end and start detector in the form of drift_info_detector

pairs

cartography.driftdetect.detect_deviations.run_drift_detection(config: GetDriftConfig) None