From 838d8892f06fdfd0bb0c5300d7fae91c2318e7df Mon Sep 17 00:00:00 2001 From: Philipinho <16838612+Philipinho@users.noreply.github.com> Date: Thu, 23 Apr 2026 16:10:32 +0100 Subject: [PATCH] feat(base): minimal async connection pool for duckdb reader pool --- .../base/query-cache/connection-pool.spec.ts | 75 ++++++++++++++++ .../core/base/query-cache/connection-pool.ts | 86 +++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 apps/server/src/core/base/query-cache/connection-pool.spec.ts create mode 100644 apps/server/src/core/base/query-cache/connection-pool.ts diff --git a/apps/server/src/core/base/query-cache/connection-pool.spec.ts b/apps/server/src/core/base/query-cache/connection-pool.spec.ts new file mode 100644 index 00000000..0e4fa86a --- /dev/null +++ b/apps/server/src/core/base/query-cache/connection-pool.spec.ts @@ -0,0 +1,75 @@ +import { ConnectionPool } from './connection-pool'; + +describe('ConnectionPool', () => { + it('hands out an available resource immediately', async () => { + const pool = new ConnectionPool(); + pool.init(['a', 'b']); + expect(await pool.acquire()).toBe('b'); + expect(await pool.acquire()).toBe('a'); + }); + + it('a waiter is resolved by the next release', async () => { + const pool = new ConnectionPool(); + pool.init(['only']); + const first = await pool.acquire(); + let resolved: string | null = null; + const secondP = pool.acquire().then((v) => (resolved = v)); + expect(resolved).toBeNull(); + pool.release(first); + await secondP; + expect(resolved).toBe('only'); + }); + + it('FIFO among waiters (fair under contention)', async () => { + const pool = new ConnectionPool(); + pool.init(['only']); + const held = await pool.acquire(); + + const order: number[] = []; + const p1 = pool.acquire().then(() => order.push(1)); + const p2 = pool.acquire().then(() => order.push(2)); + const p3 = pool.acquire().then(() => order.push(3)); + + pool.release(held); + await p1; + pool.release('only'); // re-release the value that p1 got (simulated) + await p2; + pool.release('only'); + await p3; + + expect(order).toEqual([1, 2, 3]); + }); + + it('withResource acquires, invokes callback, and releases even on throw', async () => { + const pool = new ConnectionPool(); + pool.init(['one']); + let called = false; + await expect( + pool.withResource(async (v) => { + called = true; + expect(v).toBe('one'); + throw new Error('boom'); + }), + ).rejects.toThrow('boom'); + expect(called).toBe(true); + // resource should be back in the pool + expect(await pool.acquire()).toBe('one'); + }); + + it('size() reports the initial count regardless of check-outs', () => { + const pool = new ConnectionPool(); + pool.init(['a', 'b', 'c']); + expect(pool.size()).toBe(3); + }); + + it('close() returns all held resources and rejects pending waiters', async () => { + const pool = new ConnectionPool(); + pool.init(['only']); + const first = await pool.acquire(); + const pending = pool.acquire(); + pending.catch(() => {}); // Attach catch to prevent unhandled rejection + const closed = pool.close(); + expect(closed).toEqual([]); // No free resources (one is checked out) + await expect(pending).rejects.toThrow(/closed/i); + }); +}); diff --git a/apps/server/src/core/base/query-cache/connection-pool.ts b/apps/server/src/core/base/query-cache/connection-pool.ts new file mode 100644 index 00000000..7b102d04 --- /dev/null +++ b/apps/server/src/core/base/query-cache/connection-pool.ts @@ -0,0 +1,86 @@ +type Waiter = { + resolve: (value: T) => void; + reject: (err: Error) => void; +}; + +/* + * A minimal async resource pool. No external deps. Semantics: + * + * - `acquire()` returns an available resource immediately, or a Promise + * that resolves when one is released. + * - `release(r)` returns a resource. If there are pending waiters, hands + * to the FIFO-first one. Otherwise returns to the free list. + * - `withResource(fn)` acquires, invokes, and releases — releases even + * if `fn` throws. + * - `close()` rejects all pending waiters and returns the currently-free + * resources so the owner can release them. Already-checked-out + * resources are the caller's responsibility to finish with and re-release + * (they'll get a no-op release, the pool being closed). + * + * Initial size is set via `init(resources)`. Resources must not be checked + * out before `init` is called. `size()` reports the canonical count (does + * not decrement on acquire). + */ +export class ConnectionPool { + private free: T[] = []; + private waiters: Waiter[] = []; + private initialCount = 0; + private closed = false; + + init(resources: T[]): void { + if (this.initialCount !== 0) { + throw new Error('ConnectionPool already initialised'); + } + this.free = [...resources]; + this.initialCount = resources.length; + } + + size(): number { + return this.initialCount; + } + + async acquire(): Promise { + if (this.closed) { + throw new Error('ConnectionPool is closed'); + } + if (this.free.length > 0) { + return this.free.pop()!; + } + return new Promise((resolve, reject) => { + this.waiters.push({ resolve, reject }); + }); + } + + release(resource: T): void { + if (this.closed) { + // Drop; caller expected this + return; + } + const waiter = this.waiters.shift(); + if (waiter) { + waiter.resolve(resource); + } else { + this.free.push(resource); + } + } + + async withResource(fn: (resource: T) => Promise): Promise { + const resource = await this.acquire(); + try { + return await fn(resource); + } finally { + this.release(resource); + } + } + + close(): T[] { + this.closed = true; + for (const waiter of this.waiters) { + waiter.reject(new Error('ConnectionPool is closed')); + } + this.waiters = []; + const remaining = this.free; + this.free = []; + return remaining; + } +}