Architecture
🦉 Owl
·
Go Temporal PostgreSQL Kafka
System Context
C4Context
title System Context — Owl within joel.holmes.haus
Person(admin, "Admin", "Manages personal library of books and academic papers")
Boundary(platform, "joel.holmes.haus Platform") {
System(ui, "joel.holmes.haus", "Go-app WASM admin SPA")
System(owl, "Owl", "Personal digital library — books and academic papers")
System(magpie, "Magpie", "Resource hub — receives book and paper resources from Owl")
System(shrike, "Shrike", "Search — indexes paper/book text via TextExtractedEvent")
}
SystemDb(postgres, "PostgreSQL", "Books, papers, book metadata")
SystemDb(minio, "MinIO / S3", "PDF and book file blobs")
SystemQueue(kafka, "Kafka", "owl.v1.Paper · owl.v1.Book · magpie.v1.Resource · shrike.v1.TextExtractedEvent")
System_Ext(arxiv, "arXiv / CrossRef", "Academic paper metadata APIs")
System_Ext(googlebooks, "Google Books / Open Library", "Book metadata APIs (enrichment)")
Rel(admin, ui, "Uses")
Rel(ui, owl, "ConnectRPC")
Rel(owl, postgres, "Reads / writes")
Rel(owl, minio, "Stores PDFs and book files")
Rel(owl, kafka, "Publishes paper and book events")
Rel(owl, arxiv, "Resolves DOI / arXiv metadata")
Rel(owl, googlebooks, "Enriches book metadata")
Rel(kafka, magpie, "magpie.v1.Resource")
Rel(kafka, shrike, "TextExtractedEvent")
Container Diagram
C4Container
title Owl — Internal Containers
Boundary(owl, "Owl") {
Container(api, "cmd/api", "Go / ConnectRPC h2c :9000", "BookService · PaperService · in-process Ingester goroutine")
Container(worker, "cmd/worker", "Go / Kafka", "BookConsumer · PaperConsumer · Ingester pipeline · BookIngester pipeline")
Container(bookSvc, "book.Service", "Go", "CRUD for books")
Container(paperSvc, "paper.Service", "Go", "CRUD for papers · IngestPaperByURL · GetIngestionStatus")
Container(ingester, "ingestion.Ingester", "Go / goroutine", "7-step paper pipeline: resolve → download → metadata → dedup → upload → persist → magpie")
Container(bookIngester, "ingestion.BookIngester", "Go / goroutine", "4-step book pipeline: download → dedup → upload → persist")
Container(metaClient, "metadata.CompositeClient", "Go / HTTP", "ArxivClient + CrossRefClient for DOI/arXiv resolution")
ContainerDb(bookRepo, "BookRepo + BookMetadataRepo", "PostgreSQL / squirrel", "books · book_metadata tables")
ContainerDb(paperRepo, "PaperRepo", "PostgreSQL / squirrel", "papers table")
}
SystemDb(postgres, "PostgreSQL", "")
SystemDb(minio, "MinIO / S3", "papers/CHECKSUM · books/CHECKSUM")
SystemQueue(kafka, "Kafka", "")
System_Ext(arxiv, "arXiv / CrossRef", "")
Rel(api, bookSvc, "delegates")
Rel(api, paperSvc, "delegates")
Rel(api, ingester, "Enqueue() / Status()")
Rel(kafka, worker, "owl.v1.Book · owl.v1.Paper")
Rel(worker, bookSvc, "BookConsumer")
Rel(worker, paperSvc, "PaperConsumer")
Rel(worker, ingester, "Enqueue()")
Rel(worker, bookIngester, "Enqueue()")
Rel(ingester, paperRepo, "Create")
Rel(ingester, minio, "Upload PDF")
Rel(ingester, kafka, "magpie.v1.Resource")
Rel(ingester, metaClient, "Resolve DOI / arXiv")
Rel(bookIngester, bookRepo, "Create")
Rel(bookIngester, minio, "Upload book file")
Rel(bookRepo, postgres, "SQL")
Rel(paperRepo, postgres, "SQL")
Overview
Owl is a Go service (module github.com/holmes89/owl) that provides a personal digital library for books and academic papers. Three runtime processes share the same Go module:
| Process | Entry point | Role |
|---|
| API server | cmd/api/main.go | Serves Connect-RPC on :9000; enqueues ingestion requests |
| Worker | cmd/worker/main.go | Runs ingestion pipelines + Kafka consumers |
| CLI | main.go → cmd/ | Cobra CLI client speaking gRPC to localhost:9000 |
Process Architecture
graph TD
Client["Client\nCLI · curl · UI"]
subgraph api_box["cmd/api :9000"]
BookGRPC["BookService handler"]
PaperGRPC["PaperService handler\nIngestPaperByURL · GetIngestionStatus"]
APIIngester["Ingester goroutine\nbuffered channel"]
end
subgraph worker_box["cmd/worker"]
BookConsumer["BookConsumer\nKafka: owl.v1.Book"]
PaperConsumer["PaperConsumer\nKafka: owl.v1.Paper"]
WIngester["Ingester goroutine\nrunPaperPipeline 7 steps"]
WBookIngester["BookIngester goroutine\nrunBookPipeline 4 steps"]
Meta["CompositeClient\nArxivClient · CrossRefClient"]
end
Client -->|"Connect-RPC HTTP/2"| BookGRPC
Client -->|"Connect-RPC HTTP/2"| PaperGRPC
PaperGRPC -->|"Enqueue() / Status()"| APIIngester
BookGRPC -->|"owl.v1.Book"| Kafka[("Kafka")]
PaperGRPC -->|"owl.v1.Paper"| Kafka
Kafka --> BookConsumer
Kafka --> PaperConsumer
BookConsumer -->|"Enqueue()"| WBookIngester
PaperConsumer -->|"Enqueue() or direct Create"| WIngester
WIngester --> Meta
WIngester --> MinIO[("MinIO / S3")]
WIngester --> PG[("PostgreSQL")]
WBookIngester --> MinIO
WBookIngester --> PG
APIIngester --> Meta
APIIngester --> PG
Ingestion Pipeline Detail
Paper Pipeline (runPaperPipeline — 7 steps)
| Step | Function | Notes |
|---|
| 1 | ResolveSourceActivity | arXiv/DOI detection; optional MetadataClient fetch |
| 2 | DownloadFileActivity | HTTP GET → temp file; SHA-256 checksum |
| 3 | ExtractMetadataActivity | stub (returns en); non-fatal |
| 4 | DeduplicateActivity | FindByChecksum + FindByDOI; short-circuits if duplicate |
| 5 | UploadToStorageActivity | uploads to papers/{checksum}; no-op if Storage == nil |
| 6 | PersistMetadataActivity | PaperRepo.Create — required |
| 7 | SubmitToMagpieActivity | stub — non-fatal |
Book Pipeline (runBookPipeline — 4 steps)
| Step | Function | Notes |
|---|
| 1 | DownloadFileActivity | HTTP GET → temp file; SHA-256 checksum |
| 2 | DeduplicateBookActivity | FindByChecksum only |
| 3 | UploadBookToStorageActivity | uploads to books/{checksum} |
| 4 | PersistBookActivity | BookRepo.Create — required |
Request ID Strategy
Deterministic IDs prevent duplicate concurrent ingestion and correlate API calls to status queries:
| URL type | Request ID |
|---|
arxiv.org/abs/{id} | owl-ingest-arxiv-{id} |
doi.org/... | owl-ingest-doi-{sha256[:8]} |
| Other | owl-ingest-url-{sha256[:8]} |
Ingester.Enqueue marks the request as running then Start updates to completed/failed/duplicate after the pipeline. Status is in-memory (ephemeral across restarts).
Dependency injection
graph LR
subgraph api_di["cmd/api/main.go"]
DB["repo.NewDatabase"] --> BookRepo --> BookSvc["book.NewBookService"] --> BookGRPC["bookgrpc.NewBookService"]
DB --> PaperRepo --> PaperSvc["paper.NewPaperService"] --> PaperGRPC["papergrpc.NewPaperService\n.WithIngester()"]
Acts["ingestion.Activities\nPaperRepo · BookRepo · MetadataClient"] --> Ing["ingestion.NewIngester"] -->|"go Start()"| IngR["Ingester running"]
end
subgraph worker_di["cmd/worker/main.go"]
WDB["repo.NewDatabase"] --> WBookRepo --> WBookSvc["book.NewBookService"] --> BookCon["book.NewBookConsumer\nkafka · svc · bookIngester"]
WDB --> WPaperRepo --> WPaperSvc["paper.NewPaperService"] --> PaperCon["paper.NewPaperConsumer\nkafka · svc · ingester"]
WActs["ingestion.Activities\n+ MetadataClient + Storage"] --> WIng["ingestion.NewIngester"] -->|"go Start()"| WIngR["Ingester running"]
WActs --> WBIng["ingestion.NewBookIngester"] -->|"go Start()"| WBIngR["BookIngester running"]
end
Concurrency and Error Handling
- Enqueue blocks when the channel is full — the Kafka consumer goroutine provides natural backpressure; no messages are silently dropped.
- Steps 3 (ExtractMetadata) and 7 (SubmitToMagpie) are non-fatal; errors are logged and the pipeline continues.
- Step 6 (PersistMetadata) is required; failure aborts the pipeline for that paper.
- Storage: If
MINIO_ENDPOINT is unset, upload activities return a stub path; pipeline still completes. - Deduplication: If duplicate found (by checksum or DOI), pipeline returns
Duplicate: true without storing again. - Status tracking (paper only):
Ingester maintains an in-memory jobs map; status is lost on process restart.
Infrastructure (docker-compose)
| Service | Port | Purpose |
|---|
db (postgres:15-alpine) | internal | Owl PostgreSQL database |
minio | 9010 (API), 9011 (console) | Object storage |
redpanda-0 | 19092 | Kafka-compatible broker |
api | 9000 | API server |
worker | — | Ingestion pipelines + Kafka consumers |
Environment Variables
API Server (cmd/api)
| Variable | Description |
|---|
DATABASE_URL | PostgreSQL connection string |
KAFKA_BROKERS | Kafka broker address |
Worker (cmd/worker)
| Variable | Description |
|---|
DATABASE_URL | PostgreSQL connection string |
KAFKA_BROKERS | Kafka broker address |
MINIO_ENDPOINT | MinIO/S3 endpoint; archiving skipped if unset |
MINIO_ACCESS_KEY | MinIO access key |
MINIO_SECRET_KEY | MinIO secret key |
MINIO_BUCKET | Bucket name (default: owl) |