Architecture

System Context

C4Context title System Context — Owl within joel.holmes.haus Person(admin, "Admin", "Manages personal library of books and academic papers") Boundary(platform, "joel.holmes.haus Platform") { System(ui, "joel.holmes.haus", "Go-app WASM admin SPA") System(owl, "Owl", "Personal digital library — books and academic papers") System(magpie, "Magpie", "Resource hub — receives book and paper resources from Owl") System(shrike, "Shrike", "Search — indexes paper/book text via TextExtractedEvent") } SystemDb(postgres, "PostgreSQL", "Books, papers, book metadata") SystemDb(minio, "MinIO / S3", "PDF and book file blobs") SystemQueue(kafka, "Kafka", "owl.v1.Paper · owl.v1.Book · magpie.v1.Resource · shrike.v1.TextExtractedEvent") System_Ext(arxiv, "arXiv / CrossRef", "Academic paper metadata APIs") System_Ext(googlebooks, "Google Books / Open Library", "Book metadata APIs (enrichment)") Rel(admin, ui, "Uses") Rel(ui, owl, "ConnectRPC") Rel(owl, postgres, "Reads / writes") Rel(owl, minio, "Stores PDFs and book files") Rel(owl, kafka, "Publishes paper and book events") Rel(owl, arxiv, "Resolves DOI / arXiv metadata") Rel(owl, googlebooks, "Enriches book metadata") Rel(kafka, magpie, "magpie.v1.Resource") Rel(kafka, shrike, "TextExtractedEvent")

Container Diagram

C4Container title Owl — Internal Containers Boundary(owl, "Owl") { Container(api, "cmd/api", "Go / ConnectRPC h2c :9000", "BookService · PaperService · in-process Ingester goroutine") Container(worker, "cmd/worker", "Go / Kafka", "BookConsumer · PaperConsumer · Ingester pipeline · BookIngester pipeline") Container(bookSvc, "book.Service", "Go", "CRUD for books") Container(paperSvc, "paper.Service", "Go", "CRUD for papers · IngestPaperByURL · GetIngestionStatus") Container(ingester, "ingestion.Ingester", "Go / goroutine", "7-step paper pipeline: resolve → download → metadata → dedup → upload → persist → magpie") Container(bookIngester, "ingestion.BookIngester", "Go / goroutine", "4-step book pipeline: download → dedup → upload → persist") Container(metaClient, "metadata.CompositeClient", "Go / HTTP", "ArxivClient + CrossRefClient for DOI/arXiv resolution") ContainerDb(bookRepo, "BookRepo + BookMetadataRepo", "PostgreSQL / squirrel", "books · book_metadata tables") ContainerDb(paperRepo, "PaperRepo", "PostgreSQL / squirrel", "papers table") } SystemDb(postgres, "PostgreSQL", "") SystemDb(minio, "MinIO / S3", "papers/CHECKSUM · books/CHECKSUM") SystemQueue(kafka, "Kafka", "") System_Ext(arxiv, "arXiv / CrossRef", "") Rel(api, bookSvc, "delegates") Rel(api, paperSvc, "delegates") Rel(api, ingester, "Enqueue() / Status()") Rel(kafka, worker, "owl.v1.Book · owl.v1.Paper") Rel(worker, bookSvc, "BookConsumer") Rel(worker, paperSvc, "PaperConsumer") Rel(worker, ingester, "Enqueue()") Rel(worker, bookIngester, "Enqueue()") Rel(ingester, paperRepo, "Create") Rel(ingester, minio, "Upload PDF") Rel(ingester, kafka, "magpie.v1.Resource") Rel(ingester, metaClient, "Resolve DOI / arXiv") Rel(bookIngester, bookRepo, "Create") Rel(bookIngester, minio, "Upload book file") Rel(bookRepo, postgres, "SQL") Rel(paperRepo, postgres, "SQL")

Overview

Owl is a Go service (module github.com/holmes89/owl) that provides a personal digital library for books and academic papers. Three runtime processes share the same Go module:

ProcessEntry pointRole
API servercmd/api/main.goServes Connect-RPC on :9000; enqueues ingestion requests
Workercmd/worker/main.goRuns ingestion pipelines + Kafka consumers
CLImain.gocmd/Cobra CLI client speaking gRPC to localhost:9000

Process Architecture

graph TD Client["Client\nCLI · curl · UI"] subgraph api_box["cmd/api :9000"] BookGRPC["BookService handler"] PaperGRPC["PaperService handler\nIngestPaperByURL · GetIngestionStatus"] APIIngester["Ingester goroutine\nbuffered channel"] end subgraph worker_box["cmd/worker"] BookConsumer["BookConsumer\nKafka: owl.v1.Book"] PaperConsumer["PaperConsumer\nKafka: owl.v1.Paper"] WIngester["Ingester goroutine\nrunPaperPipeline 7 steps"] WBookIngester["BookIngester goroutine\nrunBookPipeline 4 steps"] Meta["CompositeClient\nArxivClient · CrossRefClient"] end Client -->|"Connect-RPC HTTP/2"| BookGRPC Client -->|"Connect-RPC HTTP/2"| PaperGRPC PaperGRPC -->|"Enqueue() / Status()"| APIIngester BookGRPC -->|"owl.v1.Book"| Kafka[("Kafka")] PaperGRPC -->|"owl.v1.Paper"| Kafka Kafka --> BookConsumer Kafka --> PaperConsumer BookConsumer -->|"Enqueue()"| WBookIngester PaperConsumer -->|"Enqueue() or direct Create"| WIngester WIngester --> Meta WIngester --> MinIO[("MinIO / S3")] WIngester --> PG[("PostgreSQL")] WBookIngester --> MinIO WBookIngester --> PG APIIngester --> Meta APIIngester --> PG

Ingestion Pipeline Detail

Paper Pipeline (runPaperPipeline — 7 steps)

StepFunctionNotes
1ResolveSourceActivityarXiv/DOI detection; optional MetadataClient fetch
2DownloadFileActivityHTTP GET → temp file; SHA-256 checksum
3ExtractMetadataActivitystub (returns en); non-fatal
4DeduplicateActivityFindByChecksum + FindByDOI; short-circuits if duplicate
5UploadToStorageActivityuploads to papers/{checksum}; no-op if Storage == nil
6PersistMetadataActivityPaperRepo.Create — required
7SubmitToMagpieActivitystub — non-fatal

Book Pipeline (runBookPipeline — 4 steps)

StepFunctionNotes
1DownloadFileActivityHTTP GET → temp file; SHA-256 checksum
2DeduplicateBookActivityFindByChecksum only
3UploadBookToStorageActivityuploads to books/{checksum}
4PersistBookActivityBookRepo.Create — required

Request ID Strategy

Deterministic IDs prevent duplicate concurrent ingestion and correlate API calls to status queries:

URL typeRequest ID
arxiv.org/abs/{id}owl-ingest-arxiv-{id}
doi.org/...owl-ingest-doi-{sha256[:8]}
Otherowl-ingest-url-{sha256[:8]}

Ingester.Enqueue marks the request as running then Start updates to completed/failed/duplicate after the pipeline. Status is in-memory (ephemeral across restarts).

Dependency injection

graph LR subgraph api_di["cmd/api/main.go"] DB["repo.NewDatabase"] --> BookRepo --> BookSvc["book.NewBookService"] --> BookGRPC["bookgrpc.NewBookService"] DB --> PaperRepo --> PaperSvc["paper.NewPaperService"] --> PaperGRPC["papergrpc.NewPaperService\n.WithIngester()"] Acts["ingestion.Activities\nPaperRepo · BookRepo · MetadataClient"] --> Ing["ingestion.NewIngester"] -->|"go Start()"| IngR["Ingester running"] end subgraph worker_di["cmd/worker/main.go"] WDB["repo.NewDatabase"] --> WBookRepo --> WBookSvc["book.NewBookService"] --> BookCon["book.NewBookConsumer\nkafka · svc · bookIngester"] WDB --> WPaperRepo --> WPaperSvc["paper.NewPaperService"] --> PaperCon["paper.NewPaperConsumer\nkafka · svc · ingester"] WActs["ingestion.Activities\n+ MetadataClient + Storage"] --> WIng["ingestion.NewIngester"] -->|"go Start()"| WIngR["Ingester running"] WActs --> WBIng["ingestion.NewBookIngester"] -->|"go Start()"| WBIngR["BookIngester running"] end

Concurrency and Error Handling

  • Enqueue blocks when the channel is full — the Kafka consumer goroutine provides natural backpressure; no messages are silently dropped.
  • Steps 3 (ExtractMetadata) and 7 (SubmitToMagpie) are non-fatal; errors are logged and the pipeline continues.
  • Step 6 (PersistMetadata) is required; failure aborts the pipeline for that paper.
  • Storage: If MINIO_ENDPOINT is unset, upload activities return a stub path; pipeline still completes.
  • Deduplication: If duplicate found (by checksum or DOI), pipeline returns Duplicate: true without storing again.
  • Status tracking (paper only): Ingester maintains an in-memory jobs map; status is lost on process restart.

Infrastructure (docker-compose)

ServicePortPurpose
db (postgres:15-alpine)internalOwl PostgreSQL database
minio9010 (API), 9011 (console)Object storage
redpanda-019092Kafka-compatible broker
api9000API server
workerIngestion pipelines + Kafka consumers

Environment Variables

API Server (cmd/api)

VariableDescription
DATABASE_URLPostgreSQL connection string
KAFKA_BROKERSKafka broker address

Worker (cmd/worker)

VariableDescription
DATABASE_URLPostgreSQL connection string
KAFKA_BROKERSKafka broker address
MINIO_ENDPOINTMinIO/S3 endpoint; archiving skipped if unset
MINIO_ACCESS_KEYMinIO access key
MINIO_SECRET_KEYMinIO secret key
MINIO_BUCKETBucket name (default: owl)