โ Owl
Paper Pipeline (
Book Pipeline (
API Server (
Worker (
Architecture
Overview
Owl is a Go service (module github.com/holmes89/owl) that provides a personal digital library for books and academic papers. Three runtime processes share the same Go module:
| Process | Entry point | Role |
|---|---|---|
| API server | cmd/api/main.go | Serves Connect-RPC on :9000; enqueues ingestion requests |
| Worker | cmd/worker/main.go | Runs ingestion pipelines + Kafka consumers |
| CLI | main.go โ cmd/ | Cobra CLI client speaking gRPC to localhost:9000 |
Process Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Client (CLI / curl / UI) โ
โโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Connect-RPC HTTP/2 h2c :9000
โโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ API Server (cmd/api/main.go) โ
โ โ
โ BookService gRPC handler โ
โ PaperService gRPC handler โ
โ โโ IngestPaperByURL โ Ingester.Enqueue() โ
โ โโ GetIngestionStatus โ Ingester.Status() โ
โ โ
โ Ingester (buffered channel, goroutine) โ
โ PostgreSQL (lib/repo) โ
โ Kafka client [producer: nil] โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Kafka topics: owl.v1.Paper, owl.v1.Book
โโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Worker (cmd/worker/main.go) โ
โ โ
โ Kafka consumers: paper, book โ
โ โโ message.source_url != "" โ Enqueue โ
โ โโ otherwise โ direct Create โ
โ โ
โ Ingester (buffered channel, goroutine) โ
โ runPaperPipeline (7 steps) โ
โ BookIngester (buffered channel, goroutine) โ
โ runBookPipeline (4 steps) โ
โ โ
โ PostgreSQL (lib/repo) โ
โ MinIO storage (lib/storage) โ
โ Metadata clients (lib/owl/metadata) โ
โ CompositeClient โ ArxivClient โ
โ โ CrossRefClient โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Ingestion Pipeline Detail
Paper Pipeline (runPaperPipeline โ 7 steps)
| Step | Function | Notes |
|---|---|---|
| 1 | ResolveSourceActivity | arXiv/DOI detection; optional MetadataClient fetch |
| 2 | DownloadFileActivity | HTTP GET โ temp file; SHA-256 checksum |
| 3 | ExtractMetadataActivity | stub (returns en); non-fatal |
| 4 | DeduplicateActivity | FindByChecksum + FindByDOI; short-circuits if duplicate |
| 5 | UploadToStorageActivity | uploads to papers/{checksum}; no-op if Storage == nil |
| 6 | PersistMetadataActivity | PaperRepo.Create โ required |
| 7 | SubmitToMagpieActivity | stub โ non-fatal |
Book Pipeline (runBookPipeline โ 4 steps)
| Step | Function | Notes |
|---|---|---|
| 1 | DownloadFileActivity | HTTP GET โ temp file; SHA-256 checksum |
| 2 | DeduplicateBookActivity | FindByChecksum only |
| 3 | UploadBookToStorageActivity | uploads to books/{checksum} |
| 4 | PersistBookActivity | BookRepo.Create โ required |
Request ID Strategy
Deterministic IDs prevent duplicate concurrent ingestion and correlate API calls to status queries:
| URL type | Request ID |
|---|---|
arxiv.org/abs/{id} | owl-ingest-arxiv-{id} |
doi.org/... | owl-ingest-doi-{sha256[:8]} |
| Other | owl-ingest-url-{sha256[:8]} |
Ingester.Enqueue marks the request as running then Start updates to completed/failed/duplicate after the pipeline. Status is in-memory (ephemeral across restarts).
Dependency injection
cmd/api/main.go
repo.NewDatabase(conn)
โ repo.BookRepo โ book.NewBookService โ bookgrpc.NewBookService(svc, nil)
โ repo.PaperRepo โ paper.NewPaperService โ papergrpc.NewPaperService(svc, nil)
โโ .WithIngester(ingester)
ingestion.Activities{PaperRepo, BookRepo, MetadataClient}
โ ingestion.NewIngester(acts, 0) โ go ingester.Start(ctx)
cmd/worker/main.go
repo.NewDatabase(conn)
โ repo.BookRepo โ book.NewBookService โ book.NewBookConsumer(kafka, svc, bookIngester)
โ repo.PaperRepo โ paper.NewPaperService โ paper.NewPaperConsumer(kafka, svc, ingester)
โ ingestion.Activities{PaperRepo, BookRepo, MetadataClient, Storage}
โ ingestion.NewIngester(acts, 0) โ go ingester.Start(ctx)
โ ingestion.NewBookIngester(acts, 0) โ go bookIngester.Start(ctx)
Concurrency and Error Handling
- Enqueue blocks when the channel is full โ the Kafka consumer goroutine provides natural backpressure; no messages are silently dropped.
- Steps 3 (ExtractMetadata) and 7 (SubmitToMagpie) are non-fatal; errors are logged and the pipeline continues.
- Step 6 (PersistMetadata) is required; failure aborts the pipeline for that paper.
- Storage: If
MINIO_ENDPOINTis unset, upload activities return a stub path; pipeline still completes. - Deduplication: If duplicate found (by checksum or DOI), pipeline returns
Duplicate: truewithout storing again. - Status tracking (paper only):
Ingestermaintains an in-memoryjobsmap; status is lost on process restart.
Infrastructure (docker-compose)
| Service | Port | Purpose |
|---|---|---|
db (postgres:15-alpine) | internal | Owl PostgreSQL database |
minio | 9010 (API), 9011 (console) | Object storage |
redpanda-0 | 19092 | Kafka-compatible broker |
api | 9000 | API server |
worker | โ | Ingestion pipelines + Kafka consumers |
Environment Variables
API Server (cmd/api)
| Variable | Description |
|---|---|
DATABASE_URL | PostgreSQL connection string |
KAFKA_BROKERS | Kafka broker address |
Worker (cmd/worker)
| Variable | Description |
|---|---|
DATABASE_URL | PostgreSQL connection string |
KAFKA_BROKERS | Kafka broker address |
MINIO_ENDPOINT | MinIO/S3 endpoint; archiving skipped if unset |
MINIO_ACCESS_KEY | MinIO access key |
MINIO_SECRET_KEY | MinIO secret key |
MINIO_BUCKET | Bucket name (default: owl) |