Skip to content

Storage Adapters

jnkn.core.storage

Storage adapters for jnkn.

Provides pluggable persistence backends: - SQLiteStorage: Production-ready local persistence - MemoryStorage: Fast ephemeral storage for testing

Classes

MemoryStorage

Bases: StorageAdapter

Ephemeral in-memory storage.

Useful for unit testing, CI pipelines, and development.

Source code in src/jnkn/core/storage/memory.py
class MemoryStorage(StorageAdapter):
    """
    Ephemeral in-memory storage.

    Useful for unit testing, CI pipelines, and development.
    """

    def __init__(self):
        self._nodes: Dict[str, Node] = {}
        self._edges: Dict[str, Edge] = {}
        self._scan_metadata: Dict[str, ScanMetadata] = {}
        self._nodes_by_type: Dict[NodeType, Set[str]] = defaultdict(set)
        self._edges_by_source: Dict[str, Set[str]] = defaultdict(set)
        self._edges_by_target: Dict[str, Set[str]] = defaultdict(set)

    def _edge_key(self, source: str, target: str, edge_type: str) -> str:
        """Generate unique key for an edge."""
        return f"{source}|{target}|{edge_type}"

    def save_node(self, node: Node) -> None:
        """Persist a single node."""
        self._nodes[node.id] = node
        self._nodes_by_type[node.type].add(node.id)

    def save_nodes_batch(self, nodes: List[Node]) -> int:
        """Persist multiple nodes."""
        for node in nodes:
            self.save_node(node)
        return len(nodes)

    def load_node(self, node_id: str) -> Node | None:
        """Load a node by ID."""
        return self._nodes.get(node_id)

    def load_all_nodes(self) -> List[Node]:
        """Load all nodes."""
        return list(self._nodes.values())

    def delete_node(self, node_id: str) -> bool:
        """Delete a node and its edges."""
        if node_id not in self._nodes:
            return False

        node = self._nodes[node_id]
        self._nodes_by_type[node.type].discard(node_id)
        del self._nodes[node_id]

        for edge_key in list(self._edges_by_source.get(node_id, set())):
            if edge_key in self._edges:
                edge = self._edges[edge_key]
                self._edges_by_target[edge.target_id].discard(edge_key)
                del self._edges[edge_key]

        for edge_key in list(self._edges_by_target.get(node_id, set())):
            if edge_key in self._edges:
                edge = self._edges[edge_key]
                self._edges_by_source[edge.source_id].discard(edge_key)
                del self._edges[edge_key]

        self._edges_by_source.pop(node_id, None)
        self._edges_by_target.pop(node_id, None)

        return True

    def delete_nodes_by_file(self, file_path: str) -> int:
        """Delete all nodes from a file."""
        node_ids = [node_id for node_id, node in self._nodes.items() if node.path == file_path]
        for node_id in node_ids:
            self.delete_node(node_id)
        return len(node_ids)

    def save_edge(self, edge: Edge) -> None:
        """Persist a single edge."""
        key = self._edge_key(edge.source_id, edge.target_id, edge.type.value)
        self._edges[key] = edge
        self._edges_by_source[edge.source_id].add(key)
        self._edges_by_target[edge.target_id].add(key)

    def save_edges_batch(self, edges: List[Edge]) -> int:
        """Persist multiple edges."""
        for edge in edges:
            self.save_edge(edge)
        return len(edges)

    def load_all_edges(self) -> List[Edge]:
        """Load all edges."""
        return list(self._edges.values())

    def delete_edges_by_source(self, source_id: str) -> int:
        """Delete all edges from a source."""
        count = 0
        for edge_key in list(self._edges_by_source.get(source_id, set())):
            if edge_key in self._edges:
                edge = self._edges[edge_key]
                self._edges_by_target[edge.target_id].discard(edge_key)
                del self._edges[edge_key]
                count += 1
        self._edges_by_source.pop(source_id, None)
        return count

    def load_graph(self) -> DependencyGraph:
        """Hydrate a DependencyGraph."""
        graph = DependencyGraph()
        for node in self._nodes.values():
            graph.add_node(node)
        for edge in self._edges.values():
            graph.add_edge(edge)
        return graph

    def query_descendants(self, node_id: str, max_depth: int = -1) -> List[str]:
        """Query descendants via BFS."""
        visited: Set[str] = set()
        queue: List[Tuple[str, int]] = [(node_id, 0)]

        while queue:
            current, depth = queue.pop(0)

            if max_depth >= 0 and depth >= max_depth:
                continue

            for edge_key in self._edges_by_source.get(current, set()):
                edge = self._edges.get(edge_key)
                if edge and edge.target_id not in visited:
                    visited.add(edge.target_id)
                    queue.append((edge.target_id, depth + 1))

        return list(visited)

    def query_ancestors(self, node_id: str, max_depth: int = -1) -> List[str]:
        """Query ancestors via BFS."""
        visited: Set[str] = set()
        queue: List[Tuple[str, int]] = [(node_id, 0)]

        while queue:
            current, depth = queue.pop(0)

            if max_depth >= 0 and depth >= max_depth:
                continue

            for edge_key in self._edges_by_target.get(current, set()):
                edge = self._edges.get(edge_key)
                if edge and edge.source_id not in visited:
                    visited.add(edge.source_id)
                    queue.append((edge.source_id, depth + 1))

        return list(visited)

    def save_scan_metadata(self, metadata: ScanMetadata) -> None:
        """Save scan metadata."""
        self._scan_metadata[metadata.file_path] = metadata

    def get_scan_metadata(self, file_path: str) -> ScanMetadata | None:
        """Get scan metadata for a file."""
        return self._scan_metadata.get(file_path)

    def get_all_scan_metadata(self) -> List[ScanMetadata]:
        """Get all scan metadata."""
        return list(self._scan_metadata.values())

    def delete_scan_metadata(self, file_path: str) -> bool:
        """Delete scan metadata."""
        if file_path in self._scan_metadata:
            del self._scan_metadata[file_path]
            return True
        return False

    def get_schema_version(self) -> int:
        """Memory storage doesn't have schema versions."""
        return 0

    def get_stats(self) -> Dict[str, Any]:
        """Get storage statistics."""
        return {
            "total_nodes": len(self._nodes),
            "total_edges": len(self._edges),
            "tracked_files": len(self._scan_metadata),
        }

    def clear(self) -> None:
        """Clear all data."""
        self._nodes.clear()
        self._edges.clear()
        self._scan_metadata.clear()
        self._nodes_by_type.clear()
        self._edges_by_source.clear()
        self._edges_by_target.clear()

    def close(self) -> None:
        """No-op for memory storage."""
        pass
Functions
clear()

Clear all data.

Source code in src/jnkn/core/storage/memory.py
def clear(self) -> None:
    """Clear all data."""
    self._nodes.clear()
    self._edges.clear()
    self._scan_metadata.clear()
    self._nodes_by_type.clear()
    self._edges_by_source.clear()
    self._edges_by_target.clear()
close()

No-op for memory storage.

Source code in src/jnkn/core/storage/memory.py
def close(self) -> None:
    """No-op for memory storage."""
    pass
delete_edges_by_source(source_id)

Delete all edges from a source.

Source code in src/jnkn/core/storage/memory.py
def delete_edges_by_source(self, source_id: str) -> int:
    """Delete all edges from a source."""
    count = 0
    for edge_key in list(self._edges_by_source.get(source_id, set())):
        if edge_key in self._edges:
            edge = self._edges[edge_key]
            self._edges_by_target[edge.target_id].discard(edge_key)
            del self._edges[edge_key]
            count += 1
    self._edges_by_source.pop(source_id, None)
    return count
delete_node(node_id)

Delete a node and its edges.

Source code in src/jnkn/core/storage/memory.py
def delete_node(self, node_id: str) -> bool:
    """Delete a node and its edges."""
    if node_id not in self._nodes:
        return False

    node = self._nodes[node_id]
    self._nodes_by_type[node.type].discard(node_id)
    del self._nodes[node_id]

    for edge_key in list(self._edges_by_source.get(node_id, set())):
        if edge_key in self._edges:
            edge = self._edges[edge_key]
            self._edges_by_target[edge.target_id].discard(edge_key)
            del self._edges[edge_key]

    for edge_key in list(self._edges_by_target.get(node_id, set())):
        if edge_key in self._edges:
            edge = self._edges[edge_key]
            self._edges_by_source[edge.source_id].discard(edge_key)
            del self._edges[edge_key]

    self._edges_by_source.pop(node_id, None)
    self._edges_by_target.pop(node_id, None)

    return True
delete_nodes_by_file(file_path)

Delete all nodes from a file.

Source code in src/jnkn/core/storage/memory.py
def delete_nodes_by_file(self, file_path: str) -> int:
    """Delete all nodes from a file."""
    node_ids = [node_id for node_id, node in self._nodes.items() if node.path == file_path]
    for node_id in node_ids:
        self.delete_node(node_id)
    return len(node_ids)
delete_scan_metadata(file_path)

Delete scan metadata.

Source code in src/jnkn/core/storage/memory.py
def delete_scan_metadata(self, file_path: str) -> bool:
    """Delete scan metadata."""
    if file_path in self._scan_metadata:
        del self._scan_metadata[file_path]
        return True
    return False
get_all_scan_metadata()

Get all scan metadata.

Source code in src/jnkn/core/storage/memory.py
def get_all_scan_metadata(self) -> List[ScanMetadata]:
    """Get all scan metadata."""
    return list(self._scan_metadata.values())
get_scan_metadata(file_path)

Get scan metadata for a file.

Source code in src/jnkn/core/storage/memory.py
def get_scan_metadata(self, file_path: str) -> ScanMetadata | None:
    """Get scan metadata for a file."""
    return self._scan_metadata.get(file_path)
get_schema_version()

Memory storage doesn't have schema versions.

Source code in src/jnkn/core/storage/memory.py
def get_schema_version(self) -> int:
    """Memory storage doesn't have schema versions."""
    return 0
get_stats()

Get storage statistics.

Source code in src/jnkn/core/storage/memory.py
def get_stats(self) -> Dict[str, Any]:
    """Get storage statistics."""
    return {
        "total_nodes": len(self._nodes),
        "total_edges": len(self._edges),
        "tracked_files": len(self._scan_metadata),
    }
load_all_edges()

Load all edges.

Source code in src/jnkn/core/storage/memory.py
def load_all_edges(self) -> List[Edge]:
    """Load all edges."""
    return list(self._edges.values())
load_all_nodes()

Load all nodes.

Source code in src/jnkn/core/storage/memory.py
def load_all_nodes(self) -> List[Node]:
    """Load all nodes."""
    return list(self._nodes.values())
load_graph()

Hydrate a DependencyGraph.

Source code in src/jnkn/core/storage/memory.py
def load_graph(self) -> DependencyGraph:
    """Hydrate a DependencyGraph."""
    graph = DependencyGraph()
    for node in self._nodes.values():
        graph.add_node(node)
    for edge in self._edges.values():
        graph.add_edge(edge)
    return graph
load_node(node_id)

Load a node by ID.

Source code in src/jnkn/core/storage/memory.py
def load_node(self, node_id: str) -> Node | None:
    """Load a node by ID."""
    return self._nodes.get(node_id)
query_ancestors(node_id, max_depth=-1)

Query ancestors via BFS.

Source code in src/jnkn/core/storage/memory.py
def query_ancestors(self, node_id: str, max_depth: int = -1) -> List[str]:
    """Query ancestors via BFS."""
    visited: Set[str] = set()
    queue: List[Tuple[str, int]] = [(node_id, 0)]

    while queue:
        current, depth = queue.pop(0)

        if max_depth >= 0 and depth >= max_depth:
            continue

        for edge_key in self._edges_by_target.get(current, set()):
            edge = self._edges.get(edge_key)
            if edge and edge.source_id not in visited:
                visited.add(edge.source_id)
                queue.append((edge.source_id, depth + 1))

    return list(visited)
query_descendants(node_id, max_depth=-1)

Query descendants via BFS.

Source code in src/jnkn/core/storage/memory.py
def query_descendants(self, node_id: str, max_depth: int = -1) -> List[str]:
    """Query descendants via BFS."""
    visited: Set[str] = set()
    queue: List[Tuple[str, int]] = [(node_id, 0)]

    while queue:
        current, depth = queue.pop(0)

        if max_depth >= 0 and depth >= max_depth:
            continue

        for edge_key in self._edges_by_source.get(current, set()):
            edge = self._edges.get(edge_key)
            if edge and edge.target_id not in visited:
                visited.add(edge.target_id)
                queue.append((edge.target_id, depth + 1))

    return list(visited)
save_edge(edge)

Persist a single edge.

Source code in src/jnkn/core/storage/memory.py
def save_edge(self, edge: Edge) -> None:
    """Persist a single edge."""
    key = self._edge_key(edge.source_id, edge.target_id, edge.type.value)
    self._edges[key] = edge
    self._edges_by_source[edge.source_id].add(key)
    self._edges_by_target[edge.target_id].add(key)
save_edges_batch(edges)

Persist multiple edges.

Source code in src/jnkn/core/storage/memory.py
def save_edges_batch(self, edges: List[Edge]) -> int:
    """Persist multiple edges."""
    for edge in edges:
        self.save_edge(edge)
    return len(edges)
save_node(node)

Persist a single node.

Source code in src/jnkn/core/storage/memory.py
def save_node(self, node: Node) -> None:
    """Persist a single node."""
    self._nodes[node.id] = node
    self._nodes_by_type[node.type].add(node.id)
save_nodes_batch(nodes)

Persist multiple nodes.

Source code in src/jnkn/core/storage/memory.py
def save_nodes_batch(self, nodes: List[Node]) -> int:
    """Persist multiple nodes."""
    for node in nodes:
        self.save_node(node)
    return len(nodes)
save_scan_metadata(metadata)

Save scan metadata.

Source code in src/jnkn/core/storage/memory.py
def save_scan_metadata(self, metadata: ScanMetadata) -> None:
    """Save scan metadata."""
    self._scan_metadata[metadata.file_path] = metadata

SQLiteStorage

Bases: StorageAdapter

Persistent storage using local SQLite file.

Source code in src/jnkn/core/storage/sqlite.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
class SQLiteStorage(StorageAdapter):
    """
    Persistent storage using local SQLite file.
    """

    def __init__(self, db_path: Path):
        self.db_path = db_path
        self._init_db()

    @contextmanager
    def _connection(self):
        """Context manager for database connections with WAL mode."""
        conn = sqlite3.connect(
            self.db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES
        )
        conn.row_factory = sqlite3.Row
        # Performance tuning for bulk writes
        conn.execute("PRAGMA journal_mode=WAL")
        conn.execute("PRAGMA foreign_keys=ON")
        conn.execute("PRAGMA synchronous=NORMAL")
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            conn.close()

    def _init_db(self) -> None:
        """Initialize database schema with versioning."""
        self.db_path.parent.mkdir(parents=True, exist_ok=True)

        with self._connection() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS schema_version (
                    version INTEGER PRIMARY KEY,
                    applied_at TEXT NOT NULL,
                    description TEXT
                )
            """)

            current_version = self._get_schema_version_internal(conn)

            if current_version < SCHEMA_VERSION:
                self._migrate(conn, current_version)

    def _get_schema_version_internal(self, conn: sqlite3.Connection) -> int:
        """Get schema version using existing connection."""
        row = conn.execute("SELECT MAX(version) as v FROM schema_version").fetchone()
        return row["v"] if row and row["v"] else 0

    def _migrate(self, conn: sqlite3.Connection, from_version: int) -> None:
        """Run schema migrations."""

        # V1: Base Schema
        if from_version < 1:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS nodes (
                    id TEXT PRIMARY KEY,
                    name TEXT NOT NULL,
                    type TEXT NOT NULL,
                    path TEXT,
                    language TEXT,
                    file_hash TEXT,
                    tokens TEXT,
                    metadata TEXT,
                    created_at TEXT NOT NULL
                )
            """)

            conn.execute("""
                CREATE TABLE IF NOT EXISTS edges (
                    source_id TEXT NOT NULL,
                    target_id TEXT NOT NULL,
                    type TEXT NOT NULL,
                    confidence REAL DEFAULT 1.0,
                    match_strategy TEXT,
                    metadata TEXT,
                    created_at TEXT NOT NULL,
                    PRIMARY KEY (source_id, target_id, type)
                )
            """)

            conn.execute("""
                CREATE TABLE IF NOT EXISTS scan_metadata (
                    file_path TEXT PRIMARY KEY,
                    file_hash TEXT NOT NULL,
                    last_scanned TEXT NOT NULL,
                    node_count INTEGER DEFAULT 0,
                    edge_count INTEGER DEFAULT 0
                )
            """)

            conn.execute("CREATE INDEX IF NOT EXISTS idx_nodes_type ON nodes(type)")
            conn.execute("CREATE INDEX IF NOT EXISTS idx_nodes_path ON nodes(path)")
            conn.execute("CREATE INDEX IF NOT EXISTS idx_edges_source ON edges(source_id)")
            conn.execute("CREATE INDEX IF NOT EXISTS idx_edges_target ON edges(target_id)")

            conn.execute(
                """
                INSERT INTO schema_version (version, applied_at, description)
                VALUES (1, ?, 'Initial schema')
            """,
                (datetime.now(timezone.utc).isoformat(),),
            )

        # V2: Confidence Index
        if from_version < 2:
            conn.execute("CREATE INDEX IF NOT EXISTS idx_edges_confidence ON edges(confidence)")
            conn.execute(
                """
                INSERT INTO schema_version (version, applied_at, description)
                VALUES (2, ?, 'Added confidence index')
            """,
                (datetime.now(timezone.utc).isoformat(),),
            )

        # V3: Token Index and High-Confidence View
        if from_version < 3:
            # Token Index Table
            conn.execute("""
                CREATE TABLE IF NOT EXISTS token_index (
                    token TEXT NOT NULL,
                    node_id TEXT NOT NULL,
                    PRIMARY KEY (token, node_id),
                    FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE
                )
            """)
            conn.execute("CREATE INDEX IF NOT EXISTS idx_token_lookup ON token_index(token)")

            # High Confidence View (Pruning optimization)
            conn.execute("""
                CREATE VIEW IF NOT EXISTS high_confidence_edges AS
                SELECT * FROM edges WHERE confidence >= 0.7
            """)

            conn.execute(
                """
                INSERT INTO schema_version (version, applied_at, description)
                VALUES (3, ?, 'Added token_index table and high_confidence_edges view')
            """,
                (datetime.now(timezone.utc).isoformat(),),
            )

    def get_schema_version(self) -> int:
        with self._connection() as conn:
            return self._get_schema_version_internal(conn)

    # --- Node Persistence ---

    def save_node(self, node: Node) -> None:
        """Persist a single node."""
        self.save_nodes_batch([node])

    def save_nodes_batch(self, nodes: List[Node]) -> int:
        """Persist multiple nodes and their tokens in a single transaction."""
        if not nodes:
            return 0

        with self._connection() as conn:
            # 1. Upsert Nodes
            conn.executemany(
                """
                INSERT OR REPLACE INTO nodes 
                (id, name, type, path, language, file_hash, tokens, metadata, created_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
                [
                    (
                        n.id,
                        n.name,
                        n.type.value,
                        n.path,
                        n.language,
                        n.file_hash,
                        json.dumps(n.tokens),
                        json.dumps(n.metadata),
                        n.created_at.isoformat(),
                    )
                    for n in nodes
                ],
            )

            # 2. Update Token Index
            # Delete existing tokens for these nodes first to ensure clean state on update
            node_ids = [n.id for n in nodes]
            placeholders = ",".join("?" * len(node_ids))
            conn.execute(f"DELETE FROM token_index WHERE node_id IN ({placeholders})", node_ids)

            # Flatten tokens for batch insert
            token_entries = []
            for node in nodes:
                # Use tokens from the object, defaulting to computed ones if missing
                # (though model_post_init handles this usually)
                tokens = node.tokens or []
                for token in tokens:
                    token_entries.append((token, node.id))

            if token_entries:
                conn.executemany(
                    """
                    INSERT OR IGNORE INTO token_index (token, node_id)
                    VALUES (?, ?)
                """,
                    token_entries,
                )

        return len(nodes)

    def load_node(self, node_id: str) -> Node | None:
        with self._connection() as conn:
            row = conn.execute("SELECT * FROM nodes WHERE id = ?", (node_id,)).fetchone()
            return self._row_to_node(row) if row else None

    def load_all_nodes(self) -> List[Node]:
        nodes = []
        with self._connection() as conn:
            # Removed the try/except block to allow surfacing validation errors
            # (Section 2.1 of Architecture Review)
            for row in conn.execute("SELECT * FROM nodes").fetchall():
                nodes.append(self._row_to_node(row))
        return nodes

    def _row_to_node(self, row: sqlite3.Row) -> Node:
        return Node(
            id=row["id"],
            name=row["name"],
            type=NodeType(row["type"]),
            path=row["path"],
            language=row["language"],
            file_hash=row["file_hash"],
            tokens=json.loads(row["tokens"]) if row["tokens"] else [],
            metadata=json.loads(row["metadata"]) if row["metadata"] else {},
            created_at=datetime.fromisoformat(row["created_at"]),
        )

    def delete_node(self, node_id: str) -> bool:
        with self._connection() as conn:
            # Cascading delete handles token_index via FK
            conn.execute(
                "DELETE FROM edges WHERE source_id = ? OR target_id = ?", (node_id, node_id)
            )
            cursor = conn.execute("DELETE FROM nodes WHERE id = ?", (node_id,))
            return cursor.rowcount > 0

    def delete_nodes_by_file(self, file_path: str) -> int:
        with self._connection() as conn:
            # Find IDs
            rows = conn.execute("SELECT id FROM nodes WHERE path = ?", (file_path,)).fetchall()
            node_ids = [row["id"] for row in rows]

            if not node_ids:
                return 0

            placeholders = ",".join("?" * len(node_ids))

            # Delete Edges
            conn.execute(
                f"""
                DELETE FROM edges 
                WHERE source_id IN ({placeholders}) OR target_id IN ({placeholders})
            """,
                node_ids + node_ids,
            )

            # Delete Nodes (Cascades to token_index)
            cursor = conn.execute(f"DELETE FROM nodes WHERE id IN ({placeholders})", node_ids)
            return cursor.rowcount

    # --- Edge Persistence ---

    def save_edge(self, edge: Edge) -> None:
        self.save_edges_batch([edge])

    def save_edges_batch(self, edges: List[Edge]) -> int:
        if not edges:
            return 0

        with self._connection() as conn:
            conn.executemany(
                """
                INSERT OR REPLACE INTO edges 
                (source_id, target_id, type, confidence, match_strategy, metadata, created_at)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            """,
                [
                    (
                        e.source_id,
                        e.target_id,
                        e.type.value,
                        e.confidence,
                        e.match_strategy.value if e.match_strategy else None,
                        json.dumps(e.metadata),
                        e.created_at.isoformat(),
                    )
                    for e in edges
                ],
            )
        return len(edges)

    def load_all_edges(self, min_confidence: float = 0.0) -> List[Edge]:
        """
        Load all edges, optionally filtering by confidence.
        Uses the high_confidence_edges view if threshold >= 0.7.
        """
        table = "high_confidence_edges" if min_confidence >= 0.7 else "edges"
        query = f"SELECT * FROM {table}"
        params = []

        if 0.0 < min_confidence < 0.7:
            query += " WHERE confidence >= ?"
            params.append(min_confidence)

        edges = []
        with self._connection() as conn:
            for row in conn.execute(query, params).fetchall():
                try:
                    edges.append(self._row_to_edge(row))
                except Exception:
                    pass
        return edges

    def _row_to_edge(self, row: sqlite3.Row) -> Edge:
        return Edge(
            source_id=row["source_id"],
            target_id=row["target_id"],
            type=RelationshipType(row["type"]),
            confidence=row["confidence"],
            match_strategy=MatchStrategy(row["match_strategy"]) if row["match_strategy"] else None,
            metadata=json.loads(row["metadata"]) if row["metadata"] else {},
            created_at=datetime.fromisoformat(row["created_at"]),
        )

    def delete_edges_by_source(self, source_id: str) -> int:
        with self._connection() as conn:
            cursor = conn.execute("DELETE FROM edges WHERE source_id = ?", (source_id,))
            return cursor.rowcount

    # --- Scan Metadata Persistence ---

    def save_scan_metadata(self, metadata: ScanMetadata) -> None:
        with self._connection() as conn:
            conn.execute(
                """
                INSERT OR REPLACE INTO scan_metadata 
                (file_path, file_hash, last_scanned, node_count, edge_count)
                VALUES (?, ?, ?, ?, ?)
            """,
                (
                    metadata.file_path,
                    metadata.file_hash,
                    metadata.last_scanned.isoformat(),
                    metadata.node_count,
                    metadata.edge_count,
                ),
            )

    def get_scan_metadata(self, file_path: str) -> ScanMetadata | None:
        with self._connection() as conn:
            row = conn.execute(
                "SELECT * FROM scan_metadata WHERE file_path = ?", (file_path,)
            ).fetchone()

            if not row:
                return None

            return ScanMetadata(
                file_path=row["file_path"],
                file_hash=row["file_hash"],
                last_scanned=datetime.fromisoformat(row["last_scanned"]),
                node_count=row["node_count"],
                edge_count=row["edge_count"],
            )

    def get_all_scan_metadata(self) -> List[ScanMetadata]:
        with self._connection() as conn:
            rows = conn.execute("SELECT * FROM scan_metadata").fetchall()
            return [
                ScanMetadata(
                    file_path=row["file_path"],
                    file_hash=row["file_hash"],
                    last_scanned=datetime.fromisoformat(row["last_scanned"]),
                    node_count=row["node_count"],
                    edge_count=row["edge_count"],
                )
                for row in rows
            ]

    def delete_scan_metadata(self, file_path: str) -> bool:
        with self._connection() as conn:
            cursor = conn.execute("DELETE FROM scan_metadata WHERE file_path = ?", (file_path,))
            return cursor.rowcount > 0

    # --- Graph Hydration ---

    def load_graph(self) -> DependencyGraph:
        """
        Hydrate a full DependencyGraph from storage.

        Optimized to load token index from DB rather than rebuilding it.
        """
        graph = DependencyGraph()

        # Load nodes
        all_nodes = self.load_all_nodes()
        for node in all_nodes:
            graph.add_node(node)

        # Load edges
        all_edges = self.load_all_edges()
        for edge in all_edges:
            graph.add_edge(edge)

        return graph

    # --- Traversal Queries ---

    def query_descendants(self, node_id: str, max_depth: int = -1) -> List[str]:
        """Query all descendants using recursive CTE."""
        with self._connection() as conn:
            if max_depth < 0:
                rows = conn.execute(
                    """
                    WITH RECURSIVE descendants AS (
                        SELECT target_id as id, 1 as depth
                        FROM edges WHERE source_id = ?
                        UNION
                        SELECT e.target_id, d.depth + 1
                        FROM edges e JOIN descendants d ON e.source_id = d.id
                    )
                    SELECT DISTINCT id FROM descendants
                """,
                    (node_id,),
                ).fetchall()
            else:
                rows = conn.execute(
                    """
                    WITH RECURSIVE descendants AS (
                        SELECT target_id as id, 1 as depth
                        FROM edges WHERE source_id = ?
                        UNION
                        SELECT e.target_id, d.depth + 1
                        FROM edges e JOIN descendants d ON e.source_id = d.id
                        WHERE d.depth < ?
                    )
                    SELECT DISTINCT id FROM descendants
                """,
                    (node_id, max_depth),
                ).fetchall()
            return [row["id"] for row in rows]

    def query_ancestors(self, node_id: str, max_depth: int = -1) -> List[str]:
        """Query all ancestors using recursive CTE."""
        with self._connection() as conn:
            if max_depth < 0:
                rows = conn.execute(
                    """
                    WITH RECURSIVE ancestors AS (
                        SELECT source_id as id, 1 as depth
                        FROM edges WHERE target_id = ?
                        UNION
                        SELECT e.source_id, a.depth + 1
                        FROM edges e JOIN ancestors a ON e.target_id = a.id
                    )
                    SELECT DISTINCT id FROM ancestors
                """,
                    (node_id,),
                ).fetchall()
            else:
                rows = conn.execute(
                    """
                    WITH RECURSIVE ancestors AS (
                        SELECT source_id as id, 1 as depth
                        FROM edges WHERE target_id = ?
                        UNION
                        SELECT e.source_id, a.depth + 1
                        FROM edges e JOIN ancestors a ON e.target_id = a.id
                        WHERE a.depth < ?
                    )
                    SELECT DISTINCT id FROM ancestors
                """,
                    (node_id, max_depth),
                ).fetchall()
            return [row["id"] for row in rows]

    def get_stats(self) -> Dict[str, Any]:
        """Get storage statistics."""
        with self._connection() as conn:
            node_count = conn.execute("SELECT COUNT(*) as c FROM nodes").fetchone()["c"]
            edge_count = conn.execute("SELECT COUNT(*) as c FROM edges").fetchone()["c"]
            file_count = conn.execute("SELECT COUNT(*) as c FROM scan_metadata").fetchone()["c"]
            token_count = conn.execute("SELECT COUNT(*) as c FROM token_index").fetchone()["c"]

            type_rows = conn.execute(
                "SELECT type, COUNT(*) as c FROM nodes GROUP BY type"
            ).fetchall()

            edge_type_rows = conn.execute(
                "SELECT type, COUNT(*) as c FROM edges GROUP BY type"
            ).fetchall()

            return {
                "schema_version": self._get_schema_version_internal(conn),
                "total_nodes": node_count,
                "total_edges": edge_count,
                "tracked_files": file_count,
                "indexed_tokens": token_count,
                "nodes_by_type": {row["type"]: row["c"] for row in type_rows},
                "edges_by_type": {row["type"]: row["c"] for row in edge_type_rows},
                "db_size_bytes": self.db_path.stat().st_size if self.db_path.exists() else 0,
            }

    def clear(self) -> None:
        """Clear all data."""
        with self._connection() as conn:
            conn.execute("DELETE FROM edges")
            conn.execute("DELETE FROM token_index")  # Explicit, though cascade handles it
            conn.execute("DELETE FROM nodes")
            conn.execute("DELETE FROM scan_metadata")

    def close(self) -> None:
        """Close connections if needed."""
        pass
Functions
clear()

Clear all data.

Source code in src/jnkn/core/storage/sqlite.py
def clear(self) -> None:
    """Clear all data."""
    with self._connection() as conn:
        conn.execute("DELETE FROM edges")
        conn.execute("DELETE FROM token_index")  # Explicit, though cascade handles it
        conn.execute("DELETE FROM nodes")
        conn.execute("DELETE FROM scan_metadata")
close()

Close connections if needed.

Source code in src/jnkn/core/storage/sqlite.py
def close(self) -> None:
    """Close connections if needed."""
    pass
get_stats()

Get storage statistics.

Source code in src/jnkn/core/storage/sqlite.py
def get_stats(self) -> Dict[str, Any]:
    """Get storage statistics."""
    with self._connection() as conn:
        node_count = conn.execute("SELECT COUNT(*) as c FROM nodes").fetchone()["c"]
        edge_count = conn.execute("SELECT COUNT(*) as c FROM edges").fetchone()["c"]
        file_count = conn.execute("SELECT COUNT(*) as c FROM scan_metadata").fetchone()["c"]
        token_count = conn.execute("SELECT COUNT(*) as c FROM token_index").fetchone()["c"]

        type_rows = conn.execute(
            "SELECT type, COUNT(*) as c FROM nodes GROUP BY type"
        ).fetchall()

        edge_type_rows = conn.execute(
            "SELECT type, COUNT(*) as c FROM edges GROUP BY type"
        ).fetchall()

        return {
            "schema_version": self._get_schema_version_internal(conn),
            "total_nodes": node_count,
            "total_edges": edge_count,
            "tracked_files": file_count,
            "indexed_tokens": token_count,
            "nodes_by_type": {row["type"]: row["c"] for row in type_rows},
            "edges_by_type": {row["type"]: row["c"] for row in edge_type_rows},
            "db_size_bytes": self.db_path.stat().st_size if self.db_path.exists() else 0,
        }
load_all_edges(min_confidence=0.0)

Load all edges, optionally filtering by confidence. Uses the high_confidence_edges view if threshold >= 0.7.

Source code in src/jnkn/core/storage/sqlite.py
def load_all_edges(self, min_confidence: float = 0.0) -> List[Edge]:
    """
    Load all edges, optionally filtering by confidence.
    Uses the high_confidence_edges view if threshold >= 0.7.
    """
    table = "high_confidence_edges" if min_confidence >= 0.7 else "edges"
    query = f"SELECT * FROM {table}"
    params = []

    if 0.0 < min_confidence < 0.7:
        query += " WHERE confidence >= ?"
        params.append(min_confidence)

    edges = []
    with self._connection() as conn:
        for row in conn.execute(query, params).fetchall():
            try:
                edges.append(self._row_to_edge(row))
            except Exception:
                pass
    return edges
load_graph()

Hydrate a full DependencyGraph from storage.

Optimized to load token index from DB rather than rebuilding it.

Source code in src/jnkn/core/storage/sqlite.py
def load_graph(self) -> DependencyGraph:
    """
    Hydrate a full DependencyGraph from storage.

    Optimized to load token index from DB rather than rebuilding it.
    """
    graph = DependencyGraph()

    # Load nodes
    all_nodes = self.load_all_nodes()
    for node in all_nodes:
        graph.add_node(node)

    # Load edges
    all_edges = self.load_all_edges()
    for edge in all_edges:
        graph.add_edge(edge)

    return graph
query_ancestors(node_id, max_depth=-1)

Query all ancestors using recursive CTE.

Source code in src/jnkn/core/storage/sqlite.py
def query_ancestors(self, node_id: str, max_depth: int = -1) -> List[str]:
    """Query all ancestors using recursive CTE."""
    with self._connection() as conn:
        if max_depth < 0:
            rows = conn.execute(
                """
                WITH RECURSIVE ancestors AS (
                    SELECT source_id as id, 1 as depth
                    FROM edges WHERE target_id = ?
                    UNION
                    SELECT e.source_id, a.depth + 1
                    FROM edges e JOIN ancestors a ON e.target_id = a.id
                )
                SELECT DISTINCT id FROM ancestors
            """,
                (node_id,),
            ).fetchall()
        else:
            rows = conn.execute(
                """
                WITH RECURSIVE ancestors AS (
                    SELECT source_id as id, 1 as depth
                    FROM edges WHERE target_id = ?
                    UNION
                    SELECT e.source_id, a.depth + 1
                    FROM edges e JOIN ancestors a ON e.target_id = a.id
                    WHERE a.depth < ?
                )
                SELECT DISTINCT id FROM ancestors
            """,
                (node_id, max_depth),
            ).fetchall()
        return [row["id"] for row in rows]
query_descendants(node_id, max_depth=-1)

Query all descendants using recursive CTE.

Source code in src/jnkn/core/storage/sqlite.py
def query_descendants(self, node_id: str, max_depth: int = -1) -> List[str]:
    """Query all descendants using recursive CTE."""
    with self._connection() as conn:
        if max_depth < 0:
            rows = conn.execute(
                """
                WITH RECURSIVE descendants AS (
                    SELECT target_id as id, 1 as depth
                    FROM edges WHERE source_id = ?
                    UNION
                    SELECT e.target_id, d.depth + 1
                    FROM edges e JOIN descendants d ON e.source_id = d.id
                )
                SELECT DISTINCT id FROM descendants
            """,
                (node_id,),
            ).fetchall()
        else:
            rows = conn.execute(
                """
                WITH RECURSIVE descendants AS (
                    SELECT target_id as id, 1 as depth
                    FROM edges WHERE source_id = ?
                    UNION
                    SELECT e.target_id, d.depth + 1
                    FROM edges e JOIN descendants d ON e.source_id = d.id
                    WHERE d.depth < ?
                )
                SELECT DISTINCT id FROM descendants
            """,
                (node_id, max_depth),
            ).fetchall()
        return [row["id"] for row in rows]
save_node(node)

Persist a single node.

Source code in src/jnkn/core/storage/sqlite.py
def save_node(self, node: Node) -> None:
    """Persist a single node."""
    self.save_nodes_batch([node])
save_nodes_batch(nodes)

Persist multiple nodes and their tokens in a single transaction.

Source code in src/jnkn/core/storage/sqlite.py
def save_nodes_batch(self, nodes: List[Node]) -> int:
    """Persist multiple nodes and their tokens in a single transaction."""
    if not nodes:
        return 0

    with self._connection() as conn:
        # 1. Upsert Nodes
        conn.executemany(
            """
            INSERT OR REPLACE INTO nodes 
            (id, name, type, path, language, file_hash, tokens, metadata, created_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """,
            [
                (
                    n.id,
                    n.name,
                    n.type.value,
                    n.path,
                    n.language,
                    n.file_hash,
                    json.dumps(n.tokens),
                    json.dumps(n.metadata),
                    n.created_at.isoformat(),
                )
                for n in nodes
            ],
        )

        # 2. Update Token Index
        # Delete existing tokens for these nodes first to ensure clean state on update
        node_ids = [n.id for n in nodes]
        placeholders = ",".join("?" * len(node_ids))
        conn.execute(f"DELETE FROM token_index WHERE node_id IN ({placeholders})", node_ids)

        # Flatten tokens for batch insert
        token_entries = []
        for node in nodes:
            # Use tokens from the object, defaulting to computed ones if missing
            # (though model_post_init handles this usually)
            tokens = node.tokens or []
            for token in tokens:
                token_entries.append((token, node.id))

        if token_entries:
            conn.executemany(
                """
                INSERT OR IGNORE INTO token_index (token, node_id)
                VALUES (?, ?)
            """,
                token_entries,
            )

    return len(nodes)

StorageAdapter

Bases: ABC

Abstract interface for persistence strategies.

Implementations must provide: - Node and edge persistence - Graph hydration (loading) - Incremental scan support via file metadata - Batch operations for performance

Source code in src/jnkn/core/storage/base.py
class StorageAdapter(ABC):
    """
    Abstract interface for persistence strategies.

    Implementations must provide:
    - Node and edge persistence
    - Graph hydration (loading)
    - Incremental scan support via file metadata
    - Batch operations for performance
    """

    @abstractmethod
    def save_node(self, node: Node) -> None:
        """Persist a single node."""
        pass

    @abstractmethod
    def save_nodes_batch(self, nodes: List[Node]) -> int:
        """Persist multiple nodes in a single transaction."""
        pass

    @abstractmethod
    def save_edge(self, edge: Edge) -> None:
        """Persist a single edge."""
        pass

    @abstractmethod
    def save_edges_batch(self, edges: List[Edge]) -> int:
        """Persist multiple edges in a single transaction."""
        pass

    @abstractmethod
    def load_node(self, node_id: str) -> Node | None:
        """Load a node by ID."""
        pass

    @abstractmethod
    def load_all_nodes(self) -> List[Node]:
        """Load all nodes from storage."""
        pass

    @abstractmethod
    def load_all_edges(self) -> List[Edge]:
        """Load all edges from storage."""
        pass

    @abstractmethod
    def load_graph(self) -> DependencyGraph:
        """Hydrate a full DependencyGraph from storage."""
        pass

    @abstractmethod
    def delete_node(self, node_id: str) -> bool:
        """Delete a node and its associated edges."""
        pass

    @abstractmethod
    def delete_nodes_by_file(self, file_path: str) -> int:
        """Delete all nodes originating from a file."""
        pass

    @abstractmethod
    def delete_edges_by_source(self, source_id: str) -> int:
        """Delete all edges from a source node."""
        pass

    @abstractmethod
    def save_scan_metadata(self, metadata: ScanMetadata) -> None:
        """Save file scan metadata for incremental scanning."""
        pass

    @abstractmethod
    def get_scan_metadata(self, file_path: str) -> ScanMetadata | None:
        """Get scan metadata for a file."""
        pass

    @abstractmethod
    def get_all_scan_metadata(self) -> List[ScanMetadata]:
        """Get scan metadata for all files."""
        pass

    @abstractmethod
    def delete_scan_metadata(self, file_path: str) -> bool:
        """Delete scan metadata for a file."""
        pass

    @abstractmethod
    def query_descendants(self, node_id: str, max_depth: int = -1) -> List[str]:
        """Query all descendants of a node."""
        pass

    @abstractmethod
    def query_ancestors(self, node_id: str, max_depth: int = -1) -> List[str]:
        """Query all ancestors of a node."""
        pass

    @abstractmethod
    def get_schema_version(self) -> int:
        """Get the current schema version."""
        pass

    @abstractmethod
    def get_stats(self) -> Dict[str, Any]:
        """Get storage statistics."""
        pass

    @abstractmethod
    def clear(self) -> None:
        """Clear all data from storage."""
        pass

    @abstractmethod
    def close(self) -> None:
        """Close any open connections."""
        pass
Functions
clear() abstractmethod

Clear all data from storage.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def clear(self) -> None:
    """Clear all data from storage."""
    pass
close() abstractmethod

Close any open connections.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def close(self) -> None:
    """Close any open connections."""
    pass
delete_edges_by_source(source_id) abstractmethod

Delete all edges from a source node.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def delete_edges_by_source(self, source_id: str) -> int:
    """Delete all edges from a source node."""
    pass
delete_node(node_id) abstractmethod

Delete a node and its associated edges.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def delete_node(self, node_id: str) -> bool:
    """Delete a node and its associated edges."""
    pass
delete_nodes_by_file(file_path) abstractmethod

Delete all nodes originating from a file.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def delete_nodes_by_file(self, file_path: str) -> int:
    """Delete all nodes originating from a file."""
    pass
delete_scan_metadata(file_path) abstractmethod

Delete scan metadata for a file.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def delete_scan_metadata(self, file_path: str) -> bool:
    """Delete scan metadata for a file."""
    pass
get_all_scan_metadata() abstractmethod

Get scan metadata for all files.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def get_all_scan_metadata(self) -> List[ScanMetadata]:
    """Get scan metadata for all files."""
    pass
get_scan_metadata(file_path) abstractmethod

Get scan metadata for a file.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def get_scan_metadata(self, file_path: str) -> ScanMetadata | None:
    """Get scan metadata for a file."""
    pass
get_schema_version() abstractmethod

Get the current schema version.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def get_schema_version(self) -> int:
    """Get the current schema version."""
    pass
get_stats() abstractmethod

Get storage statistics.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def get_stats(self) -> Dict[str, Any]:
    """Get storage statistics."""
    pass
load_all_edges() abstractmethod

Load all edges from storage.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def load_all_edges(self) -> List[Edge]:
    """Load all edges from storage."""
    pass
load_all_nodes() abstractmethod

Load all nodes from storage.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def load_all_nodes(self) -> List[Node]:
    """Load all nodes from storage."""
    pass
load_graph() abstractmethod

Hydrate a full DependencyGraph from storage.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def load_graph(self) -> DependencyGraph:
    """Hydrate a full DependencyGraph from storage."""
    pass
load_node(node_id) abstractmethod

Load a node by ID.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def load_node(self, node_id: str) -> Node | None:
    """Load a node by ID."""
    pass
query_ancestors(node_id, max_depth=-1) abstractmethod

Query all ancestors of a node.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def query_ancestors(self, node_id: str, max_depth: int = -1) -> List[str]:
    """Query all ancestors of a node."""
    pass
query_descendants(node_id, max_depth=-1) abstractmethod

Query all descendants of a node.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def query_descendants(self, node_id: str, max_depth: int = -1) -> List[str]:
    """Query all descendants of a node."""
    pass
save_edge(edge) abstractmethod

Persist a single edge.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def save_edge(self, edge: Edge) -> None:
    """Persist a single edge."""
    pass
save_edges_batch(edges) abstractmethod

Persist multiple edges in a single transaction.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def save_edges_batch(self, edges: List[Edge]) -> int:
    """Persist multiple edges in a single transaction."""
    pass
save_node(node) abstractmethod

Persist a single node.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def save_node(self, node: Node) -> None:
    """Persist a single node."""
    pass
save_nodes_batch(nodes) abstractmethod

Persist multiple nodes in a single transaction.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def save_nodes_batch(self, nodes: List[Node]) -> int:
    """Persist multiple nodes in a single transaction."""
    pass
save_scan_metadata(metadata) abstractmethod

Save file scan metadata for incremental scanning.

Source code in src/jnkn/core/storage/base.py
@abstractmethod
def save_scan_metadata(self, metadata: ScanMetadata) -> None:
    """Save file scan metadata for incremental scanning."""
    pass