Architecture

Overview

Owl is a Go service (module github.com/holmes89/owl) that provides a personal digital library for books and academic papers. Three runtime processes share the same Go module:

ProcessEntry pointRole
API servercmd/api/main.goServes Connect-RPC on :9000; enqueues ingestion requests
Workercmd/worker/main.goRuns ingestion pipelines + Kafka consumers
CLImain.go โ†’ cmd/Cobra CLI client speaking gRPC to localhost:9000

Process Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Client (CLI / curl / UI)                      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                 โ”‚ Connect-RPC HTTP/2 h2c :9000
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  API Server (cmd/api/main.go)                  โ”‚
โ”‚                                                โ”‚
โ”‚  BookService gRPC handler                      โ”‚
โ”‚  PaperService gRPC handler                     โ”‚
โ”‚    โ””โ”€ IngestPaperByURL โ†’ Ingester.Enqueue()    โ”‚
โ”‚    โ””โ”€ GetIngestionStatus โ†’ Ingester.Status()   โ”‚
โ”‚                                                โ”‚
โ”‚  Ingester (buffered channel, goroutine)        โ”‚
โ”‚  PostgreSQL (lib/repo)                         โ”‚
โ”‚  Kafka client [producer: nil]                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                 โ”‚ Kafka topics: owl.v1.Paper, owl.v1.Book
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Worker (cmd/worker/main.go)                   โ”‚
โ”‚                                                โ”‚
โ”‚  Kafka consumers: paper, book                  โ”‚
โ”‚    โ””โ”€ message.source_url != "" โ†’ Enqueue       โ”‚
โ”‚    โ””โ”€ otherwise โ†’ direct Create                โ”‚
โ”‚                                                โ”‚
โ”‚  Ingester (buffered channel, goroutine)        โ”‚
โ”‚    runPaperPipeline (7 steps)                  โ”‚
โ”‚  BookIngester (buffered channel, goroutine)    โ”‚
โ”‚    runBookPipeline (4 steps)                   โ”‚
โ”‚                                                โ”‚
โ”‚  PostgreSQL (lib/repo)                         โ”‚
โ”‚  MinIO storage (lib/storage)                   โ”‚
โ”‚  Metadata clients (lib/owl/metadata)           โ”‚
โ”‚    CompositeClient โ†’ ArxivClient               โ”‚
โ”‚                    โ†’ CrossRefClient            โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Ingestion Pipeline Detail

Paper Pipeline (runPaperPipeline โ€” 7 steps)

StepFunctionNotes
1ResolveSourceActivityarXiv/DOI detection; optional MetadataClient fetch
2DownloadFileActivityHTTP GET โ†’ temp file; SHA-256 checksum
3ExtractMetadataActivitystub (returns en); non-fatal
4DeduplicateActivityFindByChecksum + FindByDOI; short-circuits if duplicate
5UploadToStorageActivityuploads to papers/{checksum}; no-op if Storage == nil
6PersistMetadataActivityPaperRepo.Create โ€” required
7SubmitToMagpieActivitystub โ€” non-fatal

Book Pipeline (runBookPipeline โ€” 4 steps)

StepFunctionNotes
1DownloadFileActivityHTTP GET โ†’ temp file; SHA-256 checksum
2DeduplicateBookActivityFindByChecksum only
3UploadBookToStorageActivityuploads to books/{checksum}
4PersistBookActivityBookRepo.Create โ€” required

Request ID Strategy

Deterministic IDs prevent duplicate concurrent ingestion and correlate API calls to status queries:

URL typeRequest ID
arxiv.org/abs/{id}owl-ingest-arxiv-{id}
doi.org/...owl-ingest-doi-{sha256[:8]}
Otherowl-ingest-url-{sha256[:8]}

Ingester.Enqueue marks the request as running then Start updates to completed/failed/duplicate after the pipeline. Status is in-memory (ephemeral across restarts).

Dependency injection

cmd/api/main.go
  repo.NewDatabase(conn)
    โ†’ repo.BookRepo โ†’ book.NewBookService โ†’ bookgrpc.NewBookService(svc, nil)
    โ†’ repo.PaperRepo โ†’ paper.NewPaperService โ†’ papergrpc.NewPaperService(svc, nil)
                                                 โ””โ”€ .WithIngester(ingester)
  ingestion.Activities{PaperRepo, BookRepo, MetadataClient}
    โ†’ ingestion.NewIngester(acts, 0) โ†’ go ingester.Start(ctx)

cmd/worker/main.go
  repo.NewDatabase(conn)
    โ†’ repo.BookRepo โ†’ book.NewBookService โ†’ book.NewBookConsumer(kafka, svc, bookIngester)
    โ†’ repo.PaperRepo โ†’ paper.NewPaperService โ†’ paper.NewPaperConsumer(kafka, svc, ingester)
    โ†’ ingestion.Activities{PaperRepo, BookRepo, MetadataClient, Storage}
    โ†’ ingestion.NewIngester(acts, 0)     โ†’ go ingester.Start(ctx)
    โ†’ ingestion.NewBookIngester(acts, 0) โ†’ go bookIngester.Start(ctx)

Concurrency and Error Handling

  • Enqueue blocks when the channel is full โ€” the Kafka consumer goroutine provides natural backpressure; no messages are silently dropped.
  • Steps 3 (ExtractMetadata) and 7 (SubmitToMagpie) are non-fatal; errors are logged and the pipeline continues.
  • Step 6 (PersistMetadata) is required; failure aborts the pipeline for that paper.
  • Storage: If MINIO_ENDPOINT is unset, upload activities return a stub path; pipeline still completes.
  • Deduplication: If duplicate found (by checksum or DOI), pipeline returns Duplicate: true without storing again.
  • Status tracking (paper only): Ingester maintains an in-memory jobs map; status is lost on process restart.

Infrastructure (docker-compose)

ServicePortPurpose
db (postgres:15-alpine)internalOwl PostgreSQL database
minio9010 (API), 9011 (console)Object storage
redpanda-019092Kafka-compatible broker
api9000API server
workerโ€”Ingestion pipelines + Kafka consumers

Environment Variables

API Server (cmd/api)

VariableDescription
DATABASE_URLPostgreSQL connection string
KAFKA_BROKERSKafka broker address

Worker (cmd/worker)

VariableDescription
DATABASE_URLPostgreSQL connection string
KAFKA_BROKERSKafka broker address
MINIO_ENDPOINTMinIO/S3 endpoint; archiving skipped if unset
MINIO_ACCESS_KEYMinIO access key
MINIO_SECRET_KEYMinIO secret key
MINIO_BUCKETBucket name (default: owl)