feat(base): minimal async connection pool for duckdb reader pool

This commit is contained in:
Philipinho
2026-04-23 16:10:32 +01:00
parent 08711791d6
commit 838d8892f0
2 changed files with 161 additions and 0 deletions
@@ -0,0 +1,75 @@
import { ConnectionPool } from './connection-pool';
describe('ConnectionPool', () => {
it('hands out an available resource immediately', async () => {
const pool = new ConnectionPool<string>();
pool.init(['a', 'b']);
expect(await pool.acquire()).toBe('b');
expect(await pool.acquire()).toBe('a');
});
it('a waiter is resolved by the next release', async () => {
const pool = new ConnectionPool<string>();
pool.init(['only']);
const first = await pool.acquire();
let resolved: string | null = null;
const secondP = pool.acquire().then((v) => (resolved = v));
expect(resolved).toBeNull();
pool.release(first);
await secondP;
expect(resolved).toBe('only');
});
it('FIFO among waiters (fair under contention)', async () => {
const pool = new ConnectionPool<string>();
pool.init(['only']);
const held = await pool.acquire();
const order: number[] = [];
const p1 = pool.acquire().then(() => order.push(1));
const p2 = pool.acquire().then(() => order.push(2));
const p3 = pool.acquire().then(() => order.push(3));
pool.release(held);
await p1;
pool.release('only'); // re-release the value that p1 got (simulated)
await p2;
pool.release('only');
await p3;
expect(order).toEqual([1, 2, 3]);
});
it('withResource acquires, invokes callback, and releases even on throw', async () => {
const pool = new ConnectionPool<string>();
pool.init(['one']);
let called = false;
await expect(
pool.withResource(async (v) => {
called = true;
expect(v).toBe('one');
throw new Error('boom');
}),
).rejects.toThrow('boom');
expect(called).toBe(true);
// resource should be back in the pool
expect(await pool.acquire()).toBe('one');
});
it('size() reports the initial count regardless of check-outs', () => {
const pool = new ConnectionPool<string>();
pool.init(['a', 'b', 'c']);
expect(pool.size()).toBe(3);
});
it('close() returns all held resources and rejects pending waiters', async () => {
const pool = new ConnectionPool<string>();
pool.init(['only']);
const first = await pool.acquire();
const pending = pool.acquire();
pending.catch(() => {}); // Attach catch to prevent unhandled rejection
const closed = pool.close();
expect(closed).toEqual([]); // No free resources (one is checked out)
await expect(pending).rejects.toThrow(/closed/i);
});
});
@@ -0,0 +1,86 @@
type Waiter<T> = {
resolve: (value: T) => void;
reject: (err: Error) => void;
};
/*
* A minimal async resource pool. No external deps. Semantics:
*
* - `acquire()` returns an available resource immediately, or a Promise
* that resolves when one is released.
* - `release(r)` returns a resource. If there are pending waiters, hands
* to the FIFO-first one. Otherwise returns to the free list.
* - `withResource(fn)` acquires, invokes, and releases — releases even
* if `fn` throws.
* - `close()` rejects all pending waiters and returns the currently-free
* resources so the owner can release them. Already-checked-out
* resources are the caller's responsibility to finish with and re-release
* (they'll get a no-op release, the pool being closed).
*
* Initial size is set via `init(resources)`. Resources must not be checked
* out before `init` is called. `size()` reports the canonical count (does
* not decrement on acquire).
*/
export class ConnectionPool<T> {
private free: T[] = [];
private waiters: Waiter<T>[] = [];
private initialCount = 0;
private closed = false;
init(resources: T[]): void {
if (this.initialCount !== 0) {
throw new Error('ConnectionPool already initialised');
}
this.free = [...resources];
this.initialCount = resources.length;
}
size(): number {
return this.initialCount;
}
async acquire(): Promise<T> {
if (this.closed) {
throw new Error('ConnectionPool is closed');
}
if (this.free.length > 0) {
return this.free.pop()!;
}
return new Promise<T>((resolve, reject) => {
this.waiters.push({ resolve, reject });
});
}
release(resource: T): void {
if (this.closed) {
// Drop; caller expected this
return;
}
const waiter = this.waiters.shift();
if (waiter) {
waiter.resolve(resource);
} else {
this.free.push(resource);
}
}
async withResource<R>(fn: (resource: T) => Promise<R>): Promise<R> {
const resource = await this.acquire();
try {
return await fn(resource);
} finally {
this.release(resource);
}
}
close(): T[] {
this.closed = true;
for (const waiter of this.waiters) {
waiter.reject(new Error('ConnectionPool is closed'));
}
this.waiters = [];
const remaining = this.free;
this.free = [];
return remaining;
}
}