feat: add album asset sync (#19503)

wip: fix album asset exif and some other refactorings

feat: add album assets sync

feat: album to assets relation sync

Co-authored-by: Zack Pollard <zackpollard@ymail.com>
This commit is contained in:
Jason Rasmussen 2025-06-25 12:10:31 -04:00 committed by GitHub
parent b001ba44f5
commit 881a96cdf9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 1706 additions and 90 deletions

View file

@ -340,27 +340,21 @@ export const columns = {
apiKey: ['id', 'name', 'userId', 'createdAt', 'updatedAt', 'permissions'],
notification: ['id', 'createdAt', 'level', 'type', 'title', 'description', 'data', 'readAt'],
syncAsset: [
'id',
'ownerId',
'originalFileName',
'thumbhash',
'checksum',
'fileCreatedAt',
'fileModifiedAt',
'localDateTime',
'type',
'deletedAt',
'isFavorite',
'visibility',
'updateId',
'duration',
],
syncAlbumUser: [
'albums_shared_users_users.albumsId as albumId',
'albums_shared_users_users.usersId as userId',
'albums_shared_users_users.role',
'albums_shared_users_users.updateId',
'assets.id',
'assets.ownerId',
'assets.originalFileName',
'assets.thumbhash',
'assets.checksum',
'assets.fileCreatedAt',
'assets.fileModifiedAt',
'assets.localDateTime',
'assets.type',
'assets.deletedAt',
'assets.isFavorite',
'assets.visibility',
'assets.duration',
],
syncAlbumUser: ['album_users.albumsId as albumId', 'album_users.usersId as userId', 'album_users.role'],
stack: ['stack.id', 'stack.primaryAssetId', 'ownerId'],
syncAssetExif: [
'exif.assetId',
@ -388,7 +382,6 @@ export const columns = {
'exif.profileDescription',
'exif.rating',
'exif.fps',
'exif.updateId',
],
exif: [
'exif.assetId',

10
server/src/db.d.ts vendored
View file

@ -92,6 +92,15 @@ export interface AlbumsAssetsAssets {
albumsId: string;
assetsId: string;
createdAt: Generated<Timestamp>;
updatedAt: Generated<Timestamp>;
updateId: Generated<string>;
}
export interface AlbumAssetsAudit {
deletedAt: Generated<Timestamp>;
id: Generated<string>;
albumId: string;
assetId: string;
}
export interface AlbumsSharedUsersUsers {
@ -487,6 +496,7 @@ export interface DB {
albums: Albums;
albums_audit: AlbumsAudit;
albums_assets_assets: AlbumsAssetsAssets;
album_assets_audit: AlbumAssetsAudit;
albums_shared_users_users: AlbumsSharedUsersUsers;
album_users_audit: AlbumUsersAudit;
api_keys: ApiKeys;

View file

@ -145,6 +145,18 @@ export class SyncAlbumV1 {
order!: AssetOrder;
}
export class SyncAlbumToAssetV1 {
albumId!: string;
assetId!: string;
}
export class SyncAlbumToAssetDeleteV1 {
albumId!: string;
assetId!: string;
}
export class SyncAckV1 {}
export type SyncItem = {
[SyncEntityType.UserV1]: SyncUserV1;
[SyncEntityType.UserDeleteV1]: SyncUserDeleteV1;
@ -163,7 +175,14 @@ export type SyncItem = {
[SyncEntityType.AlbumUserV1]: SyncAlbumUserV1;
[SyncEntityType.AlbumUserBackfillV1]: SyncAlbumUserV1;
[SyncEntityType.AlbumUserDeleteV1]: SyncAlbumUserDeleteV1;
[SyncEntityType.SyncAckV1]: object;
[SyncEntityType.AlbumAssetV1]: SyncAssetV1;
[SyncEntityType.AlbumAssetBackfillV1]: SyncAssetV1;
[SyncEntityType.AlbumAssetExifV1]: SyncAssetExifV1;
[SyncEntityType.AlbumAssetExifBackfillV1]: SyncAssetExifV1;
[SyncEntityType.AlbumToAssetV1]: SyncAlbumToAssetV1;
[SyncEntityType.AlbumToAssetBackfillV1]: SyncAlbumToAssetV1;
[SyncEntityType.AlbumToAssetDeleteV1]: SyncAlbumToAssetDeleteV1;
[SyncEntityType.SyncAckV1]: SyncAckV1;
};
const responseDtos = [
@ -178,6 +197,8 @@ const responseDtos = [
SyncAlbumDeleteV1,
SyncAlbumUserV1,
SyncAlbumUserDeleteV1,
SyncAlbumToAssetV1,
SyncAckV1,
];
export const extraSyncModels = responseDtos;

View file

@ -581,6 +581,9 @@ export enum SyncRequestType {
PartnerAssetExifsV1 = 'PartnerAssetExifsV1',
AlbumsV1 = 'AlbumsV1',
AlbumUsersV1 = 'AlbumUsersV1',
AlbumToAssetsV1 = 'AlbumToAssetsV1',
AlbumAssetsV1 = 'AlbumAssetsV1',
AlbumAssetExifsV1 = 'AlbumAssetExifsV1',
}
export enum SyncEntityType {
@ -605,6 +608,13 @@ export enum SyncEntityType {
AlbumUserV1 = 'AlbumUserV1',
AlbumUserBackfillV1 = 'AlbumUserBackfillV1',
AlbumUserDeleteV1 = 'AlbumUserDeleteV1',
AlbumAssetV1 = 'AlbumAssetV1',
AlbumAssetBackfillV1 = 'AlbumAssetBackfillV1',
AlbumAssetExifV1 = 'AlbumAssetExifV1',
AlbumAssetExifBackfillV1 = 'AlbumAssetExifBackfillV1',
AlbumToAssetV1 = 'AlbumToAssetV1',
AlbumToAssetDeleteV1 = 'AlbumToAssetDeleteV1',
AlbumToAssetBackfillV1 = 'AlbumToAssetBackfillV1',
SyncAckV1 = 'SyncAckV1',
}

View file

@ -74,20 +74,20 @@ order by
-- SyncRepository.getAssetUpserts
select
"id",
"ownerId",
"originalFileName",
"thumbhash",
"checksum",
"fileCreatedAt",
"fileModifiedAt",
"localDateTime",
"type",
"deletedAt",
"isFavorite",
"visibility",
"updateId",
"duration"
"assets"."id",
"assets"."ownerId",
"assets"."originalFileName",
"assets"."thumbhash",
"assets"."checksum",
"assets"."fileCreatedAt",
"assets"."fileModifiedAt",
"assets"."localDateTime",
"assets"."type",
"assets"."deletedAt",
"assets"."isFavorite",
"assets"."visibility",
"assets"."duration",
"assets"."updateId"
from
"assets"
where
@ -111,20 +111,20 @@ order by
-- SyncRepository.getPartnerAssetsBackfill
select
"id",
"ownerId",
"originalFileName",
"thumbhash",
"checksum",
"fileCreatedAt",
"fileModifiedAt",
"localDateTime",
"type",
"deletedAt",
"isFavorite",
"visibility",
"updateId",
"duration"
"assets"."id",
"assets"."ownerId",
"assets"."originalFileName",
"assets"."thumbhash",
"assets"."checksum",
"assets"."fileCreatedAt",
"assets"."fileModifiedAt",
"assets"."localDateTime",
"assets"."type",
"assets"."deletedAt",
"assets"."isFavorite",
"assets"."visibility",
"assets"."duration",
"assets"."updateId"
from
"assets"
where
@ -137,20 +137,20 @@ order by
-- SyncRepository.getPartnerAssetsUpserts
select
"id",
"ownerId",
"originalFileName",
"thumbhash",
"checksum",
"fileCreatedAt",
"fileModifiedAt",
"localDateTime",
"type",
"deletedAt",
"isFavorite",
"visibility",
"updateId",
"duration"
"assets"."id",
"assets"."ownerId",
"assets"."originalFileName",
"assets"."thumbhash",
"assets"."checksum",
"assets"."fileCreatedAt",
"assets"."fileModifiedAt",
"assets"."localDateTime",
"assets"."type",
"assets"."deletedAt",
"assets"."isFavorite",
"assets"."visibility",
"assets"."duration",
"assets"."updateId"
from
"assets"
where
@ -365,6 +365,35 @@ where
order by
"albums"."updateId" asc
-- SyncRepository.getAlbumToAssetDeletes
select
"id",
"assetId",
"albumId"
from
"album_assets_audit"
where
"albumId" in (
select
"id"
from
"albums"
where
"ownerId" = $1
union
(
select
"albumUsers"."albumsId" as "id"
from
"albums_shared_users_users" as "albumUsers"
where
"albumUsers"."usersId" = $2
)
)
and "deletedAt" < now() - interval '1 millisecond'
order by
"id" asc
-- SyncRepository.getAlbumUserDeletes
select
"id",
@ -409,12 +438,12 @@ order by
-- SyncRepository.getAlbumUsersBackfill
select
"albums_shared_users_users"."albumsId" as "albumId",
"albums_shared_users_users"."usersId" as "userId",
"albums_shared_users_users"."role",
"albums_shared_users_users"."updateId"
"album_users"."albumsId" as "albumId",
"album_users"."usersId" as "userId",
"album_users"."role",
"album_users"."updateId"
from
"albums_shared_users_users"
"albums_shared_users_users" as "album_users"
where
"albumsId" = $1
and "updatedAt" < now() - interval '1 millisecond'
@ -425,15 +454,15 @@ order by
-- SyncRepository.getAlbumUserUpserts
select
"albums_shared_users_users"."albumsId" as "albumId",
"albums_shared_users_users"."usersId" as "userId",
"albums_shared_users_users"."role",
"albums_shared_users_users"."updateId"
"album_users"."albumsId" as "albumId",
"album_users"."usersId" as "userId",
"album_users"."role",
"album_users"."updateId"
from
"albums_shared_users_users"
"albums_shared_users_users" as "album_users"
where
"albums_shared_users_users"."updatedAt" < now() - interval '1 millisecond'
and "albums_shared_users_users"."albumsId" in (
"album_users"."updatedAt" < now() - interval '1 millisecond'
and "album_users"."albumsId" in (
select
"id"
from
@ -451,4 +480,175 @@ where
)
)
order by
"albums_shared_users_users"."updateId" asc
"album_users"."updateId" asc
-- SyncRepository.getAlbumAssetsBackfill
select
"assets"."id",
"assets"."ownerId",
"assets"."originalFileName",
"assets"."thumbhash",
"assets"."checksum",
"assets"."fileCreatedAt",
"assets"."fileModifiedAt",
"assets"."localDateTime",
"assets"."type",
"assets"."deletedAt",
"assets"."isFavorite",
"assets"."visibility",
"assets"."duration",
"assets"."updateId"
from
"assets"
inner join "albums_assets_assets" as "album_assets" on "album_assets"."assetsId" = "assets"."id"
where
"album_assets"."albumsId" = $1
and "assets"."updatedAt" < now() - interval '1 millisecond'
and "assets"."updateId" <= $2
and "assets"."updateId" >= $3
order by
"assets"."updateId" asc
-- SyncRepository.getAlbumAssetsUpserts
select
"assets"."id",
"assets"."ownerId",
"assets"."originalFileName",
"assets"."thumbhash",
"assets"."checksum",
"assets"."fileCreatedAt",
"assets"."fileModifiedAt",
"assets"."localDateTime",
"assets"."type",
"assets"."deletedAt",
"assets"."isFavorite",
"assets"."visibility",
"assets"."duration",
"assets"."updateId"
from
"assets"
inner join "albums_assets_assets" as "album_assets" on "album_assets"."assetsId" = "assets"."id"
inner join "albums" on "albums"."id" = "album_assets"."albumsId"
left join "albums_shared_users_users" as "album_users" on "album_users"."albumsId" = "album_assets"."albumsId"
where
"assets"."updatedAt" < now() - interval '1 millisecond'
and (
"albums"."ownerId" = $1
or "album_users"."usersId" = $2
)
order by
"assets"."updateId" asc
-- SyncRepository.getAlbumToAssetBackfill
select
"album_assets"."assetsId" as "assetId",
"album_assets"."albumsId" as "albumId",
"album_assets"."updateId"
from
"albums_assets_assets" as "album_assets"
where
"album_assets"."albumsId" = $1
and "album_assets"."updatedAt" < now() - interval '1 millisecond'
and "album_assets"."updateId" <= $2
and "album_assets"."updateId" >= $3
order by
"album_assets"."updateId" asc
-- SyncRepository.getAlbumToAssetUpserts
select
"album_assets"."assetsId" as "assetId",
"album_assets"."albumsId" as "albumId",
"album_assets"."updateId"
from
"albums_assets_assets" as "album_assets"
inner join "albums" on "albums"."id" = "album_assets"."albumsId"
left join "albums_shared_users_users" as "album_users" on "album_users"."albumsId" = "album_assets"."albumsId"
where
"album_assets"."updatedAt" < now() - interval '1 millisecond'
and (
"albums"."ownerId" = $1
or "album_users"."usersId" = $2
)
order by
"album_assets"."updateId" asc
-- SyncRepository.getAlbumAssetExifsBackfill
select
"exif"."assetId",
"exif"."description",
"exif"."exifImageWidth",
"exif"."exifImageHeight",
"exif"."fileSizeInByte",
"exif"."orientation",
"exif"."dateTimeOriginal",
"exif"."modifyDate",
"exif"."timeZone",
"exif"."latitude",
"exif"."longitude",
"exif"."projectionType",
"exif"."city",
"exif"."state",
"exif"."country",
"exif"."make",
"exif"."model",
"exif"."lensModel",
"exif"."fNumber",
"exif"."focalLength",
"exif"."iso",
"exif"."exposureTime",
"exif"."profileDescription",
"exif"."rating",
"exif"."fps",
"exif"."updateId"
from
"exif"
inner join "albums_assets_assets" as "album_assets" on "album_assets"."assetsId" = "exif"."assetId"
where
"album_assets"."albumsId" = $1
and "exif"."updatedAt" < now() - interval '1 millisecond'
and "exif"."updateId" <= $2
and "exif"."updateId" >= $3
order by
"exif"."updateId" asc
-- SyncRepository.getAlbumAssetExifsUpserts
select
"exif"."assetId",
"exif"."description",
"exif"."exifImageWidth",
"exif"."exifImageHeight",
"exif"."fileSizeInByte",
"exif"."orientation",
"exif"."dateTimeOriginal",
"exif"."modifyDate",
"exif"."timeZone",
"exif"."latitude",
"exif"."longitude",
"exif"."projectionType",
"exif"."city",
"exif"."state",
"exif"."country",
"exif"."make",
"exif"."model",
"exif"."lensModel",
"exif"."fNumber",
"exif"."focalLength",
"exif"."iso",
"exif"."exposureTime",
"exif"."profileDescription",
"exif"."rating",
"exif"."fps",
"exif"."updateId"
from
"exif"
inner join "albums_assets_assets" as "album_assets" on "album_assets"."assetsId" = "exif"."assetId"
inner join "albums" on "albums"."id" = "album_assets"."albumsId"
left join "albums_shared_users_users" as "album_users" on "album_users"."albumsId" = "album_assets"."albumsId"
where
"exif"."updatedAt" < now() - interval '1 millisecond'
and (
"albums"."ownerId" = $1
or "album_users"."usersId" = $2
)
order by
"exif"."updateId" asc

View file

@ -7,7 +7,13 @@ import { DummyValue, GenerateSql } from 'src/decorators';
import { SyncEntityType } from 'src/enum';
import { SyncAck } from 'src/types';
type AuditTables = 'users_audit' | 'partners_audit' | 'assets_audit' | 'albums_audit' | 'album_users_audit';
type AuditTables =
| 'users_audit'
| 'partners_audit'
| 'assets_audit'
| 'albums_audit'
| 'album_users_audit'
| 'album_assets_audit';
type UpsertTables = 'users' | 'partners' | 'assets' | 'exif' | 'albums' | 'albums_shared_users_users';
@Injectable()
@ -87,6 +93,7 @@ export class SyncRepository {
return this.db
.selectFrom('assets')
.select(columns.syncAsset)
.select('assets.updateId')
.where('ownerId', '=', userId)
.$call((qb) => this.upsertTableFilters(qb, ack))
.stream();
@ -109,6 +116,7 @@ export class SyncRepository {
return this.db
.selectFrom('assets')
.select(columns.syncAsset)
.select('assets.updateId')
.where('ownerId', '=', partnerId)
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('updateId', '<=', beforeUpdateId)
@ -122,6 +130,7 @@ export class SyncRepository {
return this.db
.selectFrom('assets')
.select(columns.syncAsset)
.select('assets.updateId')
.where('ownerId', 'in', (eb) =>
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
)
@ -156,6 +165,7 @@ export class SyncRepository {
return this.db
.selectFrom('exif')
.select(columns.syncAssetExif)
.select('exif.updateId')
.where('assetId', 'in', (eb) => eb.selectFrom('assets').select('id').where('ownerId', '=', userId))
.$call((qb) => this.upsertTableFilters(qb, ack))
.stream();
@ -166,6 +176,7 @@ export class SyncRepository {
return this.db
.selectFrom('exif')
.select(columns.syncAssetExif)
.select('exif.updateId')
.innerJoin('assets', 'assets.id', 'exif.assetId')
.where('assets.ownerId', '=', partnerId)
.where('exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
@ -180,6 +191,7 @@ export class SyncRepository {
return this.db
.selectFrom('exif')
.select(columns.syncAssetExif)
.select('exif.updateId')
.where('assetId', 'in', (eb) =>
eb
.selectFrom('assets')
@ -227,6 +239,33 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getAlbumToAssetDeletes(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('album_assets_audit')
.select(['id', 'assetId', 'albumId'])
.where((eb) =>
eb(
'albumId',
'in',
eb
.selectFrom('albums')
.select(['id'])
.where('ownerId', '=', userId)
.union((eb) =>
eb.parens(
eb
.selectFrom('albums_shared_users_users as albumUsers')
.select(['albumUsers.albumsId as id'])
.where('albumUsers.usersId', '=', userId),
),
),
),
)
.$call((qb) => this.auditTableFilters(qb, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getAlbumUserDeletes(userId: string, ack?: SyncAck) {
return this.db
@ -266,11 +305,12 @@ export class SyncRepository {
.execute();
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getAlbumUsersBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db
.selectFrom('albums_shared_users_users')
.selectFrom('albums_shared_users_users as album_users')
.select(columns.syncAlbumUser)
.select('album_users.updateId')
.where('albumsId', '=', albumId)
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('updateId', '<=', beforeUpdateId)
@ -282,14 +322,15 @@ export class SyncRepository {
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getAlbumUserUpserts(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('albums_shared_users_users')
.selectFrom('albums_shared_users_users as album_users')
.select(columns.syncAlbumUser)
.where('albums_shared_users_users.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.$if(!!ack, (qb) => qb.where('albums_shared_users_users.updateId', '>', ack!.updateId))
.orderBy('albums_shared_users_users.updateId', 'asc')
.select('album_users.updateId')
.where('album_users.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.$if(!!ack, (qb) => qb.where('album_users.updateId', '>', ack!.updateId))
.orderBy('album_users.updateId', 'asc')
.where((eb) =>
eb(
'albums_shared_users_users.albumsId',
'album_users.albumsId',
'in',
eb
.selectFrom('albums')
@ -308,6 +349,95 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getAlbumAssetsBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db
.selectFrom('assets')
.innerJoin('albums_assets_assets as album_assets', 'album_assets.assetsId', 'assets.id')
.select(columns.syncAsset)
.select('assets.updateId')
.where('album_assets.albumsId', '=', albumId)
.where('assets.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('assets.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('assets.updateId', '>=', afterUpdateId!))
.orderBy('assets.updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getAlbumAssetsUpserts(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('assets')
.innerJoin('albums_assets_assets as album_assets', 'album_assets.assetsId', 'assets.id')
.select(columns.syncAsset)
.select('assets.updateId')
.where('assets.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.$if(!!ack, (qb) => qb.where('assets.updateId', '>', ack!.updateId))
.orderBy('assets.updateId', 'asc')
.innerJoin('albums', 'albums.id', 'album_assets.albumsId')
.leftJoin('albums_shared_users_users as album_users', 'album_users.albumsId', 'album_assets.albumsId')
.where((eb) => eb.or([eb('albums.ownerId', '=', userId), eb('album_users.usersId', '=', userId)]))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getAlbumToAssetBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db
.selectFrom('albums_assets_assets as album_assets')
.select(['album_assets.assetsId as assetId', 'album_assets.albumsId as albumId', 'album_assets.updateId'])
.where('album_assets.albumsId', '=', albumId)
.where('album_assets.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('album_assets.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('album_assets.updateId', '>=', afterUpdateId!))
.orderBy('album_assets.updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getAlbumToAssetUpserts(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('albums_assets_assets as album_assets')
.select(['album_assets.assetsId as assetId', 'album_assets.albumsId as albumId', 'album_assets.updateId'])
.where('album_assets.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.$if(!!ack, (qb) => qb.where('album_assets.updateId', '>', ack!.updateId))
.orderBy('album_assets.updateId', 'asc')
.innerJoin('albums', 'albums.id', 'album_assets.albumsId')
.leftJoin('albums_shared_users_users as album_users', 'album_users.albumsId', 'album_assets.albumsId')
.where((eb) => eb.or([eb('albums.ownerId', '=', userId), eb('album_users.usersId', '=', userId)]))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getAlbumAssetExifsBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db
.selectFrom('exif')
.innerJoin('albums_assets_assets as album_assets', 'album_assets.assetsId', 'exif.assetId')
.select(columns.syncAssetExif)
.select('exif.updateId')
.where('album_assets.albumsId', '=', albumId)
.where('exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('exif.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('exif.updateId', '>=', afterUpdateId!))
.orderBy('exif.updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getAlbumAssetExifsUpserts(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('exif')
.innerJoin('albums_assets_assets as album_assets', 'album_assets.assetsId', 'exif.assetId')
.select(columns.syncAssetExif)
.select('exif.updateId')
.where('exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.$if(!!ack, (qb) => qb.where('exif.updateId', '>', ack!.updateId))
.orderBy('exif.updateId', 'asc')
.innerJoin('albums', 'albums.id', 'album_assets.albumsId')
.leftJoin('albums_shared_users_users as album_users', 'album_users.albumsId', 'album_assets.albumsId')
.where((eb) => eb.or([eb('albums.ownerId', '=', userId), eb('album_users.usersId', '=', userId)]))
.stream();
}
private auditTableFilters<T extends keyof Pick<DB, AuditTables>, D>(qb: SelectQueryBuilder<DB, T, D>, ack?: SyncAck) {
const builder = qb as SelectQueryBuilder<DB, AuditTables, D>;
return builder

View file

@ -142,6 +142,20 @@ export const albums_delete_audit = registerFunction({
synchronize: false,
});
export const album_assets_delete_audit = registerFunction({
name: 'album_assets_delete_audit',
returnType: 'TRIGGER',
language: 'PLPGSQL',
body: `
BEGIN
INSERT INTO album_assets_audit ("albumId", "assetId")
SELECT "albumsId", "assetsId" FROM OLD
WHERE "albumsId" IN (SELECT "id" FROM albums WHERE "id" IN (SELECT "albumsId" FROM OLD));
RETURN NULL;
END`,
synchronize: false,
});
export const album_users_delete_audit = registerFunction({
name: 'album_users_delete_audit',
returnType: 'TRIGGER',

View file

@ -13,6 +13,7 @@ import {
users_delete_audit,
} from 'src/schema/functions';
import { ActivityTable } from 'src/schema/tables/activity.table';
import { AlbumAssetAuditTable } from 'src/schema/tables/album-asset-audit.table';
import { AlbumAssetTable } from 'src/schema/tables/album-asset.table';
import { AlbumAuditTable } from 'src/schema/tables/album-audit.table';
import { AlbumUserAuditTable } from 'src/schema/tables/album-user-audit.table';
@ -58,6 +59,7 @@ export class ImmichDatabase {
tables = [
ActivityTable,
AlbumAssetTable,
AlbumAssetAuditTable,
AlbumAuditTable,
AlbumUserAuditTable,
AlbumUserTable,

View file

@ -0,0 +1,18 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await sql`ALTER TABLE "albums_assets_assets" ADD "updatedAt" timestamp with time zone NOT NULL DEFAULT now();`.execute(db);
await sql`ALTER TABLE "albums_assets_assets" ADD "updateId" uuid NOT NULL DEFAULT immich_uuid_v7();`.execute(db);
await sql`CREATE INDEX "IDX_album_assets_update_id" ON "albums_assets_assets" ("updateId")`.execute(db);
await sql`CREATE OR REPLACE TRIGGER "album_assets_updated_at"
BEFORE UPDATE ON "albums_assets_assets"
FOR EACH ROW
EXECUTE FUNCTION updated_at();`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
await sql`DROP INDEX "IDX_album_assets_update_id";`.execute(db);
await sql`ALTER TABLE "albums_assets_assets" DROP COLUMN "updatedAt";`.execute(db);
await sql`ALTER TABLE "albums_assets_assets" DROP COLUMN "updateId";`.execute(db);
await sql`DROP TRIGGER "album_assets_updated_at" ON "albums_assets_assets";`.execute(db);
}

View file

@ -0,0 +1,22 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await sql`CREATE TABLE "album_assets_audit" ("id" uuid NOT NULL DEFAULT immich_uuid_v7(), "albumId" uuid NOT NULL, "assetId" uuid NOT NULL, "deletedAt" timestamp with time zone NOT NULL DEFAULT clock_timestamp());`.execute(db);
await sql`ALTER TABLE "album_assets_audit" ADD CONSTRAINT "PK_32969b576ec8f78d84f37c2eb2d" PRIMARY KEY ("id");`.execute(db);
await sql`CREATE INDEX "IDX_album_assets_audit_album_id" ON "album_assets_audit" ("albumId")`.execute(db);
await sql`CREATE INDEX "IDX_album_assets_audit_asset_id" ON "album_assets_audit" ("assetId")`.execute(db);
await sql`CREATE INDEX "IDX_album_assets_audit_deleted_at" ON "album_assets_audit" ("deletedAt")`.execute(db);
await sql`CREATE OR REPLACE TRIGGER "album_assets_updated_at"
BEFORE UPDATE ON "albums_assets_assets"
FOR EACH ROW
EXECUTE FUNCTION updated_at();`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
await sql`DROP TRIGGER "album_assets_updated_at" ON "albums_assets_assets";`.execute(db);
await sql`DROP INDEX "IDX_album_assets_audit_album_id";`.execute(db);
await sql`DROP INDEX "IDX_album_assets_audit_asset_id";`.execute(db);
await sql`DROP INDEX "IDX_album_assets_audit_deleted_at";`.execute(db);
await sql`ALTER TABLE "album_assets_audit" DROP CONSTRAINT "PK_32969b576ec8f78d84f37c2eb2d";`.execute(db);
await sql`DROP TABLE "album_assets_audit";`.execute(db);
}

View file

@ -0,0 +1,28 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await sql`CREATE OR REPLACE FUNCTION album_assets_delete_audit()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS $$
BEGIN
INSERT INTO album_assets_audit ("albumId", "assetId")
SELECT "albumsId", "assetsId" FROM OLD
WHERE "albumsId" IN (SELECT "id" FROM albums WHERE "id" IN (SELECT "albumsId" FROM OLD));
RETURN NULL;
END
$$;`.execute(db);
await sql`ALTER TABLE "album_assets_audit" ADD CONSTRAINT "FK_8047b44b812619a3c75a2839b0d" FOREIGN KEY ("albumId") REFERENCES "albums" ("id") ON UPDATE CASCADE ON DELETE CASCADE;`.execute(db);
await sql`CREATE OR REPLACE TRIGGER "album_assets_delete_audit"
AFTER DELETE ON "albums_assets_assets"
REFERENCING OLD TABLE AS "old"
FOR EACH STATEMENT
WHEN (pg_trigger_depth() <= 1)
EXECUTE FUNCTION album_assets_delete_audit();`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
await sql`DROP TRIGGER "album_assets_delete_audit" ON "albums_assets_assets";`.execute(db);
await sql`ALTER TABLE "album_assets_audit" DROP CONSTRAINT "FK_8047b44b812619a3c75a2839b0d";`.execute(db);
await sql`DROP FUNCTION album_assets_delete_audit;`.execute(db);
}

View file

@ -0,0 +1,23 @@
import { PrimaryGeneratedUuidV7Column } from 'src/decorators';
import { AlbumTable } from 'src/schema/tables/album.table';
import { Column, CreateDateColumn, ForeignKeyColumn, Table } from 'src/sql-tools';
@Table('album_assets_audit')
export class AlbumAssetAuditTable {
@PrimaryGeneratedUuidV7Column()
id!: string;
@ForeignKeyColumn(() => AlbumTable, {
type: 'uuid',
indexName: 'IDX_album_assets_audit_album_id',
onDelete: 'CASCADE',
onUpdate: 'CASCADE',
})
albumId!: string;
@Column({ type: 'uuid', indexName: 'IDX_album_assets_audit_asset_id' })
assetId!: string;
@CreateDateColumn({ default: () => 'clock_timestamp()', indexName: 'IDX_album_assets_audit_deleted_at' })
deletedAt!: Date;
}

View file

@ -1,8 +1,18 @@
import { UpdatedAtTrigger, UpdateIdColumn } from 'src/decorators';
import { album_assets_delete_audit } from 'src/schema/functions';
import { AlbumTable } from 'src/schema/tables/album.table';
import { AssetTable } from 'src/schema/tables/asset.table';
import { CreateDateColumn, ForeignKeyColumn, Table } from 'src/sql-tools';
import { AfterDeleteTrigger, CreateDateColumn, ForeignKeyColumn, Table, UpdateDateColumn } from 'src/sql-tools';
@Table({ name: 'albums_assets_assets', primaryConstraintName: 'PK_c67bc36fa845fb7b18e0e398180' })
@UpdatedAtTrigger('album_assets_updated_at')
@AfterDeleteTrigger({
name: 'album_assets_delete_audit',
scope: 'statement',
function: album_assets_delete_audit,
referencingOldTableAs: 'old',
when: 'pg_trigger_depth() <= 1',
})
export class AlbumAssetTable {
@ForeignKeyColumn(() => AlbumTable, { onDelete: 'CASCADE', onUpdate: 'CASCADE', nullable: false, primary: true })
albumsId!: string;
@ -12,4 +22,10 @@ export class AlbumAssetTable {
@CreateDateColumn()
createdAt!: Date;
@UpdateDateColumn()
updatedAt!: Date;
@UpdateIdColumn({ indexName: 'IDX_album_assets_update_id' })
updateId!: string;
}

View file

@ -57,11 +57,14 @@ export const SYNC_TYPES_ORDER = [
SyncRequestType.UsersV1,
SyncRequestType.PartnersV1,
SyncRequestType.AssetsV1,
SyncRequestType.AssetExifsV1,
SyncRequestType.PartnerAssetsV1,
SyncRequestType.PartnerAssetExifsV1,
SyncRequestType.AlbumsV1,
SyncRequestType.AlbumUsersV1,
SyncRequestType.AlbumAssetsV1,
SyncRequestType.AlbumToAssetsV1,
SyncRequestType.AssetExifsV1,
SyncRequestType.AlbumAssetExifsV1,
SyncRequestType.PartnerAssetExifsV1,
];
const throwSessionRequired = () => {
@ -164,6 +167,21 @@ export class SyncService extends BaseService {
break;
}
case SyncRequestType.AlbumAssetsV1: {
await this.syncAlbumAssetsV1(response, checkpointMap, auth, sessionId);
break;
}
case SyncRequestType.AlbumToAssetsV1: {
await this.syncAlbumToAssetsV1(response, checkpointMap, auth, sessionId);
break;
}
case SyncRequestType.AlbumAssetExifsV1: {
await this.syncAlbumAssetExifsV1(response, checkpointMap, auth, sessionId);
break;
}
default: {
this.logger.warn(`Unsupported sync type: ${type}`);
break;
@ -380,6 +398,147 @@ export class SyncService extends BaseService {
}
}
private async syncAlbumAssetsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto, sessionId: string) {
const backfillType = SyncEntityType.AlbumAssetBackfillV1;
const upsertType = SyncEntityType.AlbumAssetV1;
const backfillCheckpoint = checkpointMap[backfillType];
const upsertCheckpoint = checkpointMap[upsertType];
const albums = await this.syncRepository.getAlbumBackfill(auth.user.id, backfillCheckpoint?.updateId);
if (upsertCheckpoint) {
const endId = upsertCheckpoint.updateId;
for (const album of albums) {
const createId = album.createId;
if (isEntityBackfillComplete(createId, backfillCheckpoint)) {
continue;
}
const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.getAlbumAssetsBackfill(album.id, startId, endId);
for await (const { updateId, ...data } of backfill) {
send(response, { type: backfillType, ids: [createId, updateId], data: mapSyncAssetV1(data) });
}
sendEntityBackfillCompleteAck(response, backfillType, createId);
}
} else if (albums.length > 0) {
await this.upsertBackfillCheckpoint({
type: backfillType,
sessionId,
createId: albums.at(-1)!.createId,
});
}
const upserts = this.syncRepository.getAlbumAssetsUpserts(auth.user.id, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) });
}
}
private async syncAlbumAssetExifsV1(
response: Writable,
checkpointMap: CheckpointMap,
auth: AuthDto,
sessionId: string,
) {
const backfillType = SyncEntityType.AlbumAssetExifBackfillV1;
const upsertType = SyncEntityType.AlbumAssetExifV1;
const backfillCheckpoint = checkpointMap[backfillType];
const upsertCheckpoint = checkpointMap[upsertType];
const albums = await this.syncRepository.getAlbumBackfill(auth.user.id, backfillCheckpoint?.updateId);
if (upsertCheckpoint) {
const endId = upsertCheckpoint.updateId;
for (const album of albums) {
const createId = album.createId;
if (isEntityBackfillComplete(createId, backfillCheckpoint)) {
continue;
}
const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.getAlbumAssetExifsBackfill(album.id, startId, endId);
for await (const { updateId, ...data } of backfill) {
send(response, { type: backfillType, ids: [createId, updateId], data });
}
sendEntityBackfillCompleteAck(response, backfillType, createId);
}
} else if (albums.length > 0) {
await this.upsertBackfillCheckpoint({
type: backfillType,
sessionId,
createId: albums.at(-1)!.createId,
});
}
const upserts = this.syncRepository.getAlbumAssetExifsUpserts(auth.user.id, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data });
}
}
private async syncAlbumToAssetsV1(
response: Writable,
checkpointMap: CheckpointMap,
auth: AuthDto,
sessionId: string,
) {
const backfillType = SyncEntityType.AlbumToAssetBackfillV1;
const upsertType = SyncEntityType.AlbumToAssetV1;
const backfillCheckpoint = checkpointMap[backfillType];
const upsertCheckpoint = checkpointMap[upsertType];
const deletes = this.syncRepository.getAlbumToAssetDeletes(
auth.user.id,
checkpointMap[SyncEntityType.AlbumToAssetDeleteV1],
);
for await (const { id, ...data } of deletes) {
send(response, { type: SyncEntityType.AlbumToAssetDeleteV1, ids: [id], data });
}
const albums = await this.syncRepository.getAlbumBackfill(auth.user.id, backfillCheckpoint?.updateId);
if (upsertCheckpoint) {
const endId = upsertCheckpoint.updateId;
for (const album of albums) {
const createId = album.createId;
if (isEntityBackfillComplete(createId, backfillCheckpoint)) {
continue;
}
const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.getAlbumToAssetBackfill(album.id, startId, endId);
for await (const { updateId, ...data } of backfill) {
send(response, { type: backfillType, ids: [createId, updateId], data });
}
sendEntityBackfillCompleteAck(response, backfillType, createId);
}
} else if (albums.length > 0) {
await this.upsertBackfillCheckpoint({
type: backfillType,
sessionId,
createId: albums.at(-1)!.createId,
});
}
const upserts = this.syncRepository.getAlbumToAssetUpserts(auth.user.id, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data });
}
}
private async upsertBackfillCheckpoint(item: { type: SyncEntityType; sessionId: string; createId: string }) {
const { type, sessionId, createId } = item;
await this.syncRepository.upsertCheckpoints([