feat: upgrade kysely (#17630)

* feat: upgrade kysely

* chore: pr feedback
This commit is contained in:
Jason Rasmussen 2025-04-15 13:26:56 -04:00 committed by GitHub
parent 270d178a2e
commit b710ad36f3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 474 additions and 478 deletions

View file

@ -2,11 +2,9 @@ import { BullModule } from '@nestjs/bullmq';
import { Inject, Module, OnModuleDestroy, OnModuleInit, ValidationPipe } from '@nestjs/common';
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE } from '@nestjs/core';
import { ScheduleModule, SchedulerRegistry } from '@nestjs/schedule';
import { PostgresJSDialect } from 'kysely-postgres-js';
import { ClsModule } from 'nestjs-cls';
import { KyselyModule } from 'nestjs-kysely';
import { OpenTelemetryModule } from 'nestjs-otel';
import postgres from 'postgres';
import { commands } from 'src/commands';
import { IWorker } from 'src/constants';
import { controllers } from 'src/controllers';
@ -25,6 +23,7 @@ import { teardownTelemetry, TelemetryRepository } from 'src/repositories/telemet
import { services } from 'src/services';
import { AuthService } from 'src/services/auth.service';
import { CliService } from 'src/services/cli.service';
import { getKyselyConfig } from 'src/utils/database';
const common = [...repositories, ...services, GlobalExceptionFilter];
@ -45,19 +44,7 @@ const imports = [
BullModule.registerQueue(...bull.queues),
ClsModule.forRoot(cls.config),
OpenTelemetryModule.forRoot(otel),
KyselyModule.forRoot({
dialect: new PostgresJSDialect({ postgres: postgres(database.config.kysely) }),
log(event) {
if (event.level === 'error') {
console.error('Query failed :', {
durationMs: event.queryDurationMillis,
error: event.error,
sql: event.query.sql,
params: event.query.parameters,
});
}
},
}),
KyselyModule.forRoot(getKyselyConfig(database.config.kysely)),
];
class BaseModule implements OnModuleInit, OnModuleDestroy {

View file

@ -2,7 +2,6 @@
process.env.DB_URL = process.env.DB_URL || 'postgres://postgres:postgres@localhost:5432/immich';
import { Kysely } from 'kysely';
import { PostgresJSDialect } from 'kysely-postgres-js';
import { writeFileSync } from 'node:fs';
import { basename, dirname, extname, join } from 'node:path';
import postgres from 'postgres';
@ -11,6 +10,7 @@ import { DatabaseRepository } from 'src/repositories/database.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import 'src/schema';
import { schemaDiff, schemaFromCode, schemaFromDatabase } from 'src/sql-tools';
import { getKyselyConfig } from 'src/utils/database';
const main = async () => {
const command = process.argv[2];
@ -52,19 +52,7 @@ const run = async (only?: 'kysely' | 'typeorm') => {
const configRepository = new ConfigRepository();
const { database } = configRepository.getEnv();
const logger = new LoggingRepository(undefined, configRepository);
const db = new Kysely<any>({
dialect: new PostgresJSDialect({ postgres: postgres(database.config.kysely) }),
log(event) {
if (event.level === 'error') {
console.error('Query failed :', {
durationMs: event.queryDurationMillis,
error: event.error,
sql: event.query.sql,
params: event.query.parameters,
});
}
},
});
const db = new Kysely<any>(getKyselyConfig(database.config.kysely));
const databaseRepository = new DatabaseRepository(db, logger, configRepository);
await databaseRepository.runMigrations({ only });

View file

@ -4,13 +4,11 @@ import { Reflector } from '@nestjs/core';
import { SchedulerRegistry } from '@nestjs/schedule';
import { Test } from '@nestjs/testing';
import { ClassConstructor } from 'class-transformer';
import { PostgresJSDialect } from 'kysely-postgres-js';
import { ClsModule } from 'nestjs-cls';
import { KyselyModule } from 'nestjs-kysely';
import { OpenTelemetryModule } from 'nestjs-otel';
import { mkdir, rm, writeFile } from 'node:fs/promises';
import { join } from 'node:path';
import postgres from 'postgres';
import { format } from 'sql-formatter';
import { GENERATE_SQL_KEY, GenerateSqlQueries } from 'src/decorators';
import { repositories } from 'src/repositories';
@ -18,6 +16,11 @@ import { AccessRepository } from 'src/repositories/access.repository';
import { ConfigRepository } from 'src/repositories/config.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { AuthService } from 'src/services/auth.service';
import { getKyselyConfig } from 'src/utils/database';
const handleError = (label: string, error: Error | any) => {
console.error(`${label} error: ${error}`);
};
export class SqlLogger {
queries: string[] = [];
@ -75,7 +78,7 @@ class SqlGenerator {
const moduleFixture = await Test.createTestingModule({
imports: [
KyselyModule.forRoot({
dialect: new PostgresJSDialect({ postgres: postgres(database.config.kysely) }),
...getKyselyConfig(database.config.kysely),
log: (event) => {
if (event.level === 'query') {
this.sqlLogger.logQuery(event.query.sql);
@ -135,7 +138,7 @@ class SqlGenerator {
queries.push({ params: [] });
}
for (const { name, params } of queries) {
for (const { name, params, stream } of queries) {
let queryLabel = `${label}.${key}`;
if (name) {
queryLabel += ` (${name})`;
@ -143,8 +146,19 @@ class SqlGenerator {
this.sqlLogger.clear();
// errors still generate sql, which is all we care about
await target.apply(instance, params).catch((error: Error) => console.error(`${queryLabel} error: ${error}`));
if (stream) {
try {
const result: AsyncIterableIterator<unknown> = target.apply(instance, params);
for await (const _ of result) {
break;
}
} catch (error) {
handleError(queryLabel, error);
}
} else {
// errors still generate sql, which is all we care about
await target.apply(instance, params).catch((error: Error) => handleError(queryLabel, error));
}
if (this.sqlLogger.queries.length === 0) {
console.warn(`No queries recorded for ${queryLabel}`);

View file

@ -123,6 +123,7 @@ export const GENERATE_SQL_KEY = 'generate-sql-key';
export interface GenerateSqlQueries {
name?: string;
params: unknown[];
stream?: boolean;
}
export const Telemetry = (options: { enabled?: boolean }) =>

View file

@ -58,3 +58,53 @@ where
"assets"."id" = $1::uuid
limit
$2
-- AssetJobRepository.getForStorageTemplateJob
select
"assets"."id",
"assets"."ownerId",
"assets"."type",
"assets"."checksum",
"assets"."originalPath",
"assets"."isExternal",
"assets"."sidecarPath",
"assets"."originalFileName",
"assets"."livePhotoVideoId",
"assets"."fileCreatedAt",
"exif"."timeZone",
"exif"."fileSizeInByte"
from
"assets"
inner join "exif" on "assets"."id" = "exif"."assetId"
where
"assets"."deletedAt" is null
and "assets"."id" = $1
-- AssetJobRepository.streamForStorageTemplateJob
select
"assets"."id",
"assets"."ownerId",
"assets"."type",
"assets"."checksum",
"assets"."originalPath",
"assets"."isExternal",
"assets"."sidecarPath",
"assets"."originalFileName",
"assets"."livePhotoVideoId",
"assets"."fileCreatedAt",
"exif"."timeZone",
"exif"."fileSizeInByte"
from
"assets"
inner join "exif" on "assets"."id" = "exif"."assetId"
where
"assets"."deletedAt" is null
-- AssetJobRepository.streamForDeletedJob
select
"id",
"isOffline"
from
"assets"
where
"assets"."deletedAt" <= $1

View file

@ -0,0 +1,248 @@
-- NOTE: This file is auto generated by ./sql-generator
-- SyncRepository.getCheckpoints
select
"type",
"ack"
from
"session_sync_checkpoints"
where
"sessionId" = $1
-- SyncRepository.deleteCheckpoints
delete from "session_sync_checkpoints"
where
"sessionId" = $1
-- SyncRepository.getUserUpserts
select
"id",
"name",
"email",
"deletedAt",
"updateId"
from
"users"
where
"updatedAt" < now() - interval '1 millisecond'
order by
"updateId" asc
-- SyncRepository.getUserDeletes
select
"id",
"userId"
from
"users_audit"
where
"deletedAt" < now() - interval '1 millisecond'
order by
"id" asc
-- SyncRepository.getPartnerUpserts
select
"sharedById",
"sharedWithId",
"inTimeline",
"updateId"
from
"partners"
where
(
"sharedById" = $1
or "sharedWithId" = $2
)
and "updatedAt" < now() - interval '1 millisecond'
order by
"updateId" asc
-- SyncRepository.getPartnerDeletes
select
"id",
"sharedById",
"sharedWithId"
from
"partners_audit"
where
(
"sharedById" = $1
or "sharedWithId" = $2
)
and "deletedAt" < now() - interval '1 millisecond'
order by
"id" asc
-- SyncRepository.getAssetUpserts
select
"id",
"ownerId",
"thumbhash",
"checksum",
"fileCreatedAt",
"fileModifiedAt",
"localDateTime",
"type",
"deletedAt",
"isFavorite",
"isVisible",
"updateId"
from
"assets"
where
"ownerId" = $1
and "updatedAt" < now() - interval '1 millisecond'
order by
"updateId" asc
-- SyncRepository.getPartnerAssetsUpserts
select
"id",
"ownerId",
"thumbhash",
"checksum",
"fileCreatedAt",
"fileModifiedAt",
"localDateTime",
"type",
"deletedAt",
"isFavorite",
"isVisible",
"updateId"
from
"assets"
where
"ownerId" in (
select
"sharedById"
from
"partners"
where
"sharedWithId" = $1
)
and "updatedAt" < now() - interval '1 millisecond'
order by
"updateId" asc
-- SyncRepository.getAssetDeletes
select
"id",
"assetId"
from
"assets_audit"
where
"ownerId" = $1
and "deletedAt" < now() - interval '1 millisecond'
order by
"id" asc
-- SyncRepository.getPartnerAssetDeletes
select
"id",
"assetId"
from
"assets_audit"
where
"ownerId" in (
select
"sharedById"
from
"partners"
where
"sharedWithId" = $1
)
and "deletedAt" < now() - interval '1 millisecond'
order by
"id" asc
-- SyncRepository.getAssetExifsUpserts
select
"exif"."assetId",
"exif"."description",
"exif"."exifImageWidth",
"exif"."exifImageHeight",
"exif"."fileSizeInByte",
"exif"."orientation",
"exif"."dateTimeOriginal",
"exif"."modifyDate",
"exif"."timeZone",
"exif"."latitude",
"exif"."longitude",
"exif"."projectionType",
"exif"."city",
"exif"."state",
"exif"."country",
"exif"."make",
"exif"."model",
"exif"."lensModel",
"exif"."fNumber",
"exif"."focalLength",
"exif"."iso",
"exif"."exposureTime",
"exif"."profileDescription",
"exif"."rating",
"exif"."fps",
"exif"."updateId"
from
"exif"
where
"assetId" in (
select
"id"
from
"assets"
where
"ownerId" = $1
)
and "updatedAt" < now() - interval '1 millisecond'
order by
"updateId" asc
-- SyncRepository.getPartnerAssetExifsUpserts
select
"exif"."assetId",
"exif"."description",
"exif"."exifImageWidth",
"exif"."exifImageHeight",
"exif"."fileSizeInByte",
"exif"."orientation",
"exif"."dateTimeOriginal",
"exif"."modifyDate",
"exif"."timeZone",
"exif"."latitude",
"exif"."longitude",
"exif"."projectionType",
"exif"."city",
"exif"."state",
"exif"."country",
"exif"."make",
"exif"."model",
"exif"."lensModel",
"exif"."fNumber",
"exif"."focalLength",
"exif"."iso",
"exif"."exposureTime",
"exif"."profileDescription",
"exif"."rating",
"exif"."fps",
"exif"."updateId"
from
"exif"
where
"assetId" in (
select
"id"
from
"assets"
where
"ownerId" in (
select
"sharedById"
from
"partners"
where
"sharedWithId" = $1
)
)
and "updatedAt" < now() - interval '1 millisecond'
order by
"updateId" asc

View file

@ -75,16 +75,19 @@ export class AssetJobRepository {
.where('assets.deletedAt', 'is', null);
}
@GenerateSql({ params: [DummyValue.UUID] })
getForStorageTemplateJob(id: string): Promise<StorageAsset | undefined> {
return this.storageTemplateAssetQuery().where('assets.id', '=', id).executeTakeFirst() as Promise<
StorageAsset | undefined
>;
}
@GenerateSql({ params: [], stream: true })
streamForStorageTemplateJob() {
return this.storageTemplateAssetQuery().stream() as AsyncIterableIterator<StorageAsset>;
}
@GenerateSql({ params: [DummyValue.DATE], stream: true })
streamForDeletedJob(trashedBefore: Date) {
return this.db
.selectFrom('assets')

View file

@ -9,7 +9,6 @@ import { CLS_ID, ClsModuleOptions } from 'nestjs-cls';
import { OpenTelemetryModuleOptions } from 'nestjs-otel/lib/interfaces';
import { join, resolve } from 'node:path';
import { parse } from 'pg-connection-string';
import { Notice } from 'postgres';
import { citiesFile, excludePaths, IWorker } from 'src/constants';
import { Telemetry } from 'src/decorators';
import { EnvDto } from 'src/dtos/env.dto';
@ -23,23 +22,10 @@ import {
QueueName,
} from 'src/enum';
import { DatabaseConnectionParams, VectorExtension } from 'src/types';
import { isValidSsl, PostgresConnectionConfig } from 'src/utils/database';
import { setDifference } from 'src/utils/set';
import { PostgresConnectionOptions } from 'typeorm/driver/postgres/PostgresConnectionOptions.js';
type Ssl = 'require' | 'allow' | 'prefer' | 'verify-full' | boolean | object;
type PostgresConnectionConfig = {
host?: string;
password?: string;
user?: string;
port?: number;
database?: string;
client_encoding?: string;
ssl?: Ssl;
application_name?: string;
fallback_application_name?: string;
options?: string;
};
export interface EnvData {
host?: string;
port: number;
@ -144,9 +130,6 @@ const asSet = <T>(value: string | undefined, defaults: T[]) => {
return new Set(values.length === 0 ? defaults : (values as T[]));
};
const isValidSsl = (ssl?: string | boolean | object): ssl is Ssl =>
typeof ssl !== 'string' || ssl === 'require' || ssl === 'allow' || ssl === 'prefer' || ssl === 'verify-full';
const getEnv = (): EnvData => {
const dto = plainToInstance(EnvDto, process.env);
const errors = validateSync(dto);
@ -233,33 +216,6 @@ const getEnv = (): EnvData => {
};
}
const driverOptions = {
...parsedOptions,
onnotice: (notice: Notice) => {
if (notice['severity'] !== 'NOTICE') {
console.warn('Postgres notice:', notice);
}
},
max: 10,
types: {
date: {
to: 1184,
from: [1082, 1114, 1184],
serialize: (x: Date | string) => (x instanceof Date ? x.toISOString() : x),
parse: (x: string) => new Date(x),
},
bigint: {
to: 20,
from: [20, 1700],
parse: (value: string) => Number.parseInt(value),
serialize: (value: number) => value.toString(),
},
},
connection: {
TimeZone: 'UTC',
},
};
return {
host: dto.IMMICH_HOST,
port: dto.IMMICH_PORT || 2283,
@ -325,7 +281,7 @@ const getEnv = (): EnvData => {
parseInt8: true,
...(databaseUrl ? { connectionType: 'url', url: databaseUrl } : parts),
},
kysely: driverOptions,
kysely: parsedOptions,
},
skipMigrations: dto.DB_SKIP_MIGRATIONS ?? false,

View file

@ -1,5 +1,5 @@
import { Injectable } from '@nestjs/common';
import { Kysely, OrderByDirectionExpression, sql } from 'kysely';
import { Kysely, OrderByDirection, sql } from 'kysely';
import { InjectKysely } from 'nestjs-kysely';
import { randomUUID } from 'node:crypto';
import { DB } from 'src/db';
@ -223,7 +223,7 @@ export class SearchRepository {
],
})
async searchMetadata(pagination: SearchPaginationOptions, options: AssetSearchOptions): Paginated<AssetEntity> {
const orderDirection = (options.orderDirection?.toLowerCase() || 'desc') as OrderByDirectionExpression;
const orderDirection = (options.orderDirection?.toLowerCase() || 'desc') as OrderByDirection;
const items = await searchAssetBuilder(this.db, options)
.orderBy('assets.fileCreatedAt', orderDirection)
.limit(pagination.size + 1)

View file

@ -3,6 +3,7 @@ import { Insertable, Kysely, SelectQueryBuilder, sql } from 'kysely';
import { InjectKysely } from 'nestjs-kysely';
import { columns } from 'src/database';
import { DB, SessionSyncCheckpoints } from 'src/db';
import { DummyValue, GenerateSql } from 'src/decorators';
import { SyncEntityType } from 'src/enum';
import { SyncAck } from 'src/types';
@ -13,6 +14,7 @@ type upsertTables = 'users' | 'partners' | 'assets' | 'exif';
export class SyncRepository {
constructor(@InjectKysely() private db: Kysely<DB>) {}
@GenerateSql({ params: [DummyValue.UUID] })
getCheckpoints(sessionId: string) {
return this.db
.selectFrom('session_sync_checkpoints')
@ -33,6 +35,7 @@ export class SyncRepository {
.execute();
}
@GenerateSql({ params: [DummyValue.UUID] })
deleteCheckpoints(sessionId: string, types?: SyncEntityType[]) {
return this.db
.deleteFrom('session_sync_checkpoints')
@ -41,6 +44,7 @@ export class SyncRepository {
.execute();
}
@GenerateSql({ params: [], stream: true })
getUserUpserts(ack?: SyncAck) {
return this.db
.selectFrom('users')
@ -49,6 +53,7 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [], stream: true })
getUserDeletes(ack?: SyncAck) {
return this.db
.selectFrom('users_audit')
@ -57,6 +62,7 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getPartnerUpserts(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('partners')
@ -66,6 +72,7 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getPartnerDeletes(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('partners_audit')
@ -75,6 +82,7 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getAssetUpserts(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('assets')
@ -84,6 +92,7 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getPartnerAssetsUpserts(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('assets')
@ -95,6 +104,7 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getAssetDeletes(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('assets_audit')
@ -105,6 +115,7 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getPartnerAssetDeletes(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('assets_audit')
@ -116,6 +127,7 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getAssetExifsUpserts(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('exif')
@ -125,6 +137,7 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getPartnerAssetExifsUpserts(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('exif')

View file

@ -1,4 +1,77 @@
import { Expression, ExpressionBuilder, ExpressionWrapper, Nullable, Selectable, Simplify, sql } from 'kysely';
import {
Expression,
ExpressionBuilder,
ExpressionWrapper,
KyselyConfig,
Nullable,
Selectable,
Simplify,
sql,
} from 'kysely';
import { PostgresJSDialect } from 'kysely-postgres-js';
import postgres, { Notice } from 'postgres';
type Ssl = 'require' | 'allow' | 'prefer' | 'verify-full' | boolean | object;
export type PostgresConnectionConfig = {
host?: string;
password?: string;
user?: string;
port?: number;
database?: string;
max?: number;
client_encoding?: string;
ssl?: Ssl;
application_name?: string;
fallback_application_name?: string;
options?: string;
};
export const isValidSsl = (ssl?: string | boolean | object): ssl is Ssl =>
typeof ssl !== 'string' || ssl === 'require' || ssl === 'allow' || ssl === 'prefer' || ssl === 'verify-full';
export const getKyselyConfig = (options: PostgresConnectionConfig): KyselyConfig => {
return {
dialect: new PostgresJSDialect({
postgres: postgres({
onnotice: (notice: Notice) => {
if (notice['severity'] !== 'NOTICE') {
console.warn('Postgres notice:', notice);
}
},
max: 10,
types: {
date: {
to: 1184,
from: [1082, 1114, 1184],
serialize: (x: Date | string) => (x instanceof Date ? x.toISOString() : x),
parse: (x: string) => new Date(x),
},
bigint: {
to: 20,
from: [20, 1700],
parse: (value: string) => Number.parseInt(value),
serialize: (value: number) => value.toString(),
},
},
connection: {
TimeZone: 'UTC',
},
...options,
}),
}),
log(event) {
if (event.level === 'error') {
console.error('Query failed :', {
durationMs: event.queryDurationMillis,
error: event.error,
sql: event.query.sql,
params: event.query.parameters,
});
}
},
};
};
export const asUuid = (id: string | Expression<string>) => sql<string>`${id}::uuid`;