fix: partner asset and exif sync backfill (#19224)

* fix: partner asset sync backfill

* fix: add partner asset exif backfill

* ci: output content of files that have changed
This commit is contained in:
Zack Pollard 2025-06-17 14:56:54 +01:00 committed by GitHub
parent db68d1af9b
commit 749f63e4a0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 607 additions and 37 deletions

View file

@ -209,6 +209,7 @@ export type Partner = {
sharedWithId: string;
sharedWith: User;
createdAt: Date;
createId: string;
updatedAt: Date;
updateId: string;
inTimeline: boolean;

1
server/src/db.d.ts vendored
View file

@ -332,6 +332,7 @@ export interface PartnersAudit {
export interface Partners {
createdAt: Generated<Timestamp>;
createId: Generated<string>;
inTimeline: Generated<boolean>;
sharedById: string;
sharedWithId: string;

View file

@ -14,6 +14,9 @@ const GeneratedUuidV7Column = (options: Omit<ColumnOptions, 'type' | 'default' |
export const UpdateIdColumn = (options: Omit<ColumnOptions, 'type' | 'default' | 'nullable'> = {}) =>
GeneratedUuidV7Column(options);
export const CreateIdColumn = (options: Omit<ColumnOptions, 'type' | 'default' | 'nullable'> = {}) =>
GeneratedUuidV7Column(options);
export const PrimaryGeneratedUuidV7Column = () => GeneratedUuidV7Column({ primary: true });
export const UpdatedAtTrigger = (name: string) =>

View file

@ -154,12 +154,15 @@ export type SyncItem = {
[SyncEntityType.AssetDeleteV1]: SyncAssetDeleteV1;
[SyncEntityType.AssetExifV1]: SyncAssetExifV1;
[SyncEntityType.PartnerAssetV1]: SyncAssetV1;
[SyncEntityType.PartnerAssetBackfillV1]: SyncAssetV1;
[SyncEntityType.PartnerAssetDeleteV1]: SyncAssetDeleteV1;
[SyncEntityType.PartnerAssetExifV1]: SyncAssetExifV1;
[SyncEntityType.PartnerAssetExifBackfillV1]: SyncAssetExifV1;
[SyncEntityType.AlbumV1]: SyncAlbumV1;
[SyncEntityType.AlbumDeleteV1]: SyncAlbumDeleteV1;
[SyncEntityType.AlbumUserV1]: SyncAlbumUserV1;
[SyncEntityType.AlbumUserDeleteV1]: SyncAlbumUserDeleteV1;
[SyncEntityType.SyncAckV1]: object;
};
const responseDtos = [

View file

@ -595,13 +595,17 @@ export enum SyncEntityType {
AssetExifV1 = 'AssetExifV1',
PartnerAssetV1 = 'PartnerAssetV1',
PartnerAssetBackfillV1 = 'PartnerAssetBackfillV1',
PartnerAssetDeleteV1 = 'PartnerAssetDeleteV1',
PartnerAssetExifV1 = 'PartnerAssetExifV1',
PartnerAssetExifBackfillV1 = 'PartnerAssetExifBackfillV1',
AlbumV1 = 'AlbumV1',
AlbumDeleteV1 = 'AlbumDeleteV1',
AlbumUserV1 = 'AlbumUserV1',
AlbumUserDeleteV1 = 'AlbumUserDeleteV1',
SyncAckV1 = 'SyncAckV1',
}
export enum NotificationLevel {

View file

@ -96,6 +96,45 @@ where
order by
"updateId" asc
-- SyncRepository.getPartnerBackfill
select
"sharedById",
"createId"
from
"partners"
where
"sharedWithId" = $1
and "createId" >= $2
and "createdAt" < now() - interval '1 millisecond'
order by
"partners"."createId" asc
-- SyncRepository.getPartnerAssetsBackfill
select
"id",
"ownerId",
"originalFileName",
"thumbhash",
"checksum",
"fileCreatedAt",
"fileModifiedAt",
"localDateTime",
"type",
"deletedAt",
"isFavorite",
"visibility",
"updateId",
"duration"
from
"assets"
where
"ownerId" = $1
and "updatedAt" < now() - interval '1 millisecond'
and "updateId" < $2
and "updateId" >= $3
order by
"updateId" asc
-- SyncRepository.getPartnerAssetsUpserts
select
"id",
@ -201,6 +240,45 @@ where
order by
"updateId" asc
-- SyncRepository.getPartnerAssetExifsBackfill
select
"exif"."assetId",
"exif"."description",
"exif"."exifImageWidth",
"exif"."exifImageHeight",
"exif"."fileSizeInByte",
"exif"."orientation",
"exif"."dateTimeOriginal",
"exif"."modifyDate",
"exif"."timeZone",
"exif"."latitude",
"exif"."longitude",
"exif"."projectionType",
"exif"."city",
"exif"."state",
"exif"."country",
"exif"."make",
"exif"."model",
"exif"."lensModel",
"exif"."fNumber",
"exif"."focalLength",
"exif"."iso",
"exif"."exposureTime",
"exif"."profileDescription",
"exif"."rating",
"exif"."fps",
"exif"."updateId"
from
"exif"
inner join "assets" on "assets"."id" = "exif"."assetId"
where
"assets"."ownerId" = $1
and "exif"."updatedAt" < now() - interval '1 millisecond'
and "exif"."updateId" < $2
and "exif"."updateId" >= $3
order by
"exif"."updateId" asc
-- SyncRepository.getPartnerAssetExifsUpserts
select
"exif"."assetId",

View file

@ -92,6 +92,31 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] })
getPartnerBackfill(userId: string, afterCreateId?: string) {
return this.db
.selectFrom('partners')
.select(['sharedById', 'createId'])
.where('sharedWithId', '=', userId)
.$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!))
.where('createdAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy('partners.createId', 'asc')
.execute();
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getPartnerAssetsBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db
.selectFrom('assets')
.select(columns.syncAsset)
.where('ownerId', '=', partnerId)
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('updateId', '<', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!))
.orderBy('updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getPartnerAssetsUpserts(userId: string, ack?: SyncAck) {
return this.db
@ -136,6 +161,20 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getPartnerAssetExifsBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db
.selectFrom('exif')
.select(columns.syncAssetExif)
.innerJoin('assets', 'assets.id', 'exif.assetId')
.where('assets.ownerId', '=', partnerId)
.where('exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('exif.updateId', '<', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('exif.updateId', '>=', afterUpdateId!))
.orderBy('exif.updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getPartnerAssetExifsUpserts(userId: string, ack?: SyncAck) {
return this.db

View file

@ -0,0 +1,10 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await sql`ALTER TABLE "partners" ADD "createId" uuid NOT NULL DEFAULT immich_uuid_v7();`.execute(db);
await sql`UPDATE "partners" SET "createId" = immich_uuid_v7("createdAt")`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
await sql`ALTER TABLE "partners" DROP COLUMN "createId";`.execute(db);
}

View file

@ -1,4 +1,4 @@
import { UpdatedAtTrigger, UpdateIdColumn } from 'src/decorators';
import { CreateIdColumn, UpdatedAtTrigger, UpdateIdColumn } from 'src/decorators';
import { partners_delete_audit } from 'src/schema/functions';
import { UserTable } from 'src/schema/tables/user.table';
import { AfterDeleteTrigger, Column, CreateDateColumn, ForeignKeyColumn, Table, UpdateDateColumn } from 'src/sql-tools';
@ -27,6 +27,9 @@ export class PartnerTable {
@CreateDateColumn()
createdAt!: Date;
@CreateIdColumn()
createId!: string;
@UpdateDateColumn()
updatedAt!: Date;

View file

@ -20,7 +20,7 @@ import { SyncAck } from 'src/types';
import { getMyPartnerIds } from 'src/utils/asset.util';
import { hexOrBufferToBase64 } from 'src/utils/bytes';
import { setIsEqual } from 'src/utils/set';
import { fromAck, serialize } from 'src/utils/sync';
import { fromAck, serialize, toAck } from 'src/utils/sync';
const FULL_SYNC = { needsFullSync: true, deleted: [], upserted: [] };
export const SYNC_TYPES_ORDER = [
@ -98,12 +98,12 @@ export class SyncService extends BaseService {
case SyncRequestType.UsersV1: {
const deletes = this.syncRepository.getUserDeletes(checkpointMap[SyncEntityType.UserDeleteV1]);
for await (const { id, ...data } of deletes) {
response.write(serialize({ type: SyncEntityType.UserDeleteV1, updateId: id, data }));
response.write(serialize({ type: SyncEntityType.UserDeleteV1, ids: [id], data }));
}
const upserts = this.syncRepository.getUserUpserts(checkpointMap[SyncEntityType.UserV1]);
for await (const { updateId, ...data } of upserts) {
response.write(serialize({ type: SyncEntityType.UserV1, updateId, data }));
response.write(serialize({ type: SyncEntityType.UserV1, ids: [updateId], data }));
}
break;
@ -115,12 +115,12 @@ export class SyncService extends BaseService {
checkpointMap[SyncEntityType.PartnerDeleteV1],
);
for await (const { id, ...data } of deletes) {
response.write(serialize({ type: SyncEntityType.PartnerDeleteV1, updateId: id, data }));
response.write(serialize({ type: SyncEntityType.PartnerDeleteV1, ids: [id], data }));
}
const upserts = this.syncRepository.getPartnerUpserts(auth.user.id, checkpointMap[SyncEntityType.PartnerV1]);
for await (const { updateId, ...data } of upserts) {
response.write(serialize({ type: SyncEntityType.PartnerV1, updateId, data }));
response.write(serialize({ type: SyncEntityType.PartnerV1, ids: [updateId], data }));
}
break;
@ -132,7 +132,7 @@ export class SyncService extends BaseService {
checkpointMap[SyncEntityType.AssetDeleteV1],
);
for await (const { id, ...data } of deletes) {
response.write(serialize({ type: SyncEntityType.AssetDeleteV1, updateId: id, data }));
response.write(serialize({ type: SyncEntityType.AssetDeleteV1, ids: [id], data }));
}
const upserts = this.syncRepository.getAssetUpserts(auth.user.id, checkpointMap[SyncEntityType.AssetV1]);
@ -140,7 +140,7 @@ export class SyncService extends BaseService {
response.write(
serialize({
type: SyncEntityType.AssetV1,
updateId,
ids: [updateId],
data: {
...data,
checksum: hexOrBufferToBase64(checksum),
@ -159,7 +159,60 @@ export class SyncService extends BaseService {
checkpointMap[SyncEntityType.PartnerAssetDeleteV1],
);
for await (const { id, ...data } of deletes) {
response.write(serialize({ type: SyncEntityType.PartnerAssetDeleteV1, updateId: id, data }));
response.write(serialize({ type: SyncEntityType.PartnerAssetDeleteV1, ids: [id], data }));
}
const checkpoint = checkpointMap[SyncEntityType.PartnerAssetBackfillV1];
const partnerAssetCheckpoint = checkpointMap[SyncEntityType.PartnerAssetV1];
const partners = await this.syncRepository.getPartnerBackfill(auth.user.id, checkpoint?.updateId);
if (partnerAssetCheckpoint) {
for (const partner of partners) {
if (partner.createId === checkpoint?.updateId && checkpoint.extraId === 'complete') {
continue;
}
const partnerCheckpoint = checkpoint?.updateId === partner.createId ? checkpoint?.extraId : undefined;
const backfill = this.syncRepository.getPartnerAssetsBackfill(
partner.sharedById,
partnerCheckpoint,
partnerAssetCheckpoint.updateId,
);
for await (const { updateId, checksum, thumbhash, ...data } of backfill) {
response.write(
serialize({
type: SyncEntityType.PartnerAssetBackfillV1,
ids: [updateId],
data: {
...data,
checksum: hexOrBufferToBase64(checksum),
thumbhash: thumbhash ? hexOrBufferToBase64(thumbhash) : null,
},
}),
);
}
response.write(
serialize({
type: SyncEntityType.SyncAckV1,
data: {},
ackType: SyncEntityType.PartnerAssetBackfillV1,
ids: [partner.sharedById, 'complete'],
}),
);
}
} else if (partners.length > 0) {
await this.syncRepository.upsertCheckpoints([
{
type: SyncEntityType.PartnerAssetBackfillV1,
sessionId,
ack: toAck({
type: SyncEntityType.PartnerAssetBackfillV1,
updateId: partners.at(-1)!.createId,
extraId: 'complete',
}),
},
]);
}
const upserts = this.syncRepository.getPartnerAssetsUpserts(
@ -170,7 +223,7 @@ export class SyncService extends BaseService {
response.write(
serialize({
type: SyncEntityType.PartnerAssetV1,
updateId,
ids: [updateId],
data: {
...data,
checksum: hexOrBufferToBase64(checksum),
@ -189,19 +242,74 @@ export class SyncService extends BaseService {
checkpointMap[SyncEntityType.AssetExifV1],
);
for await (const { updateId, ...data } of upserts) {
response.write(serialize({ type: SyncEntityType.AssetExifV1, updateId, data }));
response.write(serialize({ type: SyncEntityType.AssetExifV1, ids: [updateId], data }));
}
break;
}
case SyncRequestType.PartnerAssetExifsV1: {
const checkpoint = checkpointMap[SyncEntityType.PartnerAssetExifBackfillV1];
const partnerAssetCheckpoint = checkpointMap[SyncEntityType.PartnerAssetExifV1];
const partners = await this.syncRepository.getPartnerBackfill(auth.user.id, checkpoint?.updateId);
if (partnerAssetCheckpoint) {
for (const partner of partners) {
if (partner.createId === checkpoint?.updateId && checkpoint.extraId === 'complete') {
continue;
}
const partnerCheckpoint = checkpoint?.updateId === partner.createId ? checkpoint?.extraId : undefined;
const backfill = this.syncRepository.getPartnerAssetExifsBackfill(
partner.sharedById,
partnerCheckpoint,
partnerAssetCheckpoint.updateId,
);
for await (const { updateId, ...data } of backfill) {
response.write(
serialize({
type: SyncEntityType.PartnerAssetExifBackfillV1,
ids: [updateId],
data,
}),
);
}
response.write(
serialize({
type: SyncEntityType.SyncAckV1,
data: {},
ackType: SyncEntityType.PartnerAssetExifBackfillV1,
ids: [partner.sharedById, 'complete'],
}),
);
}
} else if (partners.length > 0) {
await this.syncRepository.upsertCheckpoints([
{
type: SyncEntityType.PartnerAssetExifBackfillV1,
sessionId,
ack: toAck({
type: SyncEntityType.PartnerAssetExifBackfillV1,
updateId: partners.at(-1)!.createId,
extraId: 'complete',
}),
},
]);
}
const upserts = this.syncRepository.getPartnerAssetExifsUpserts(
auth.user.id,
checkpointMap[SyncEntityType.PartnerAssetExifV1],
);
for await (const { updateId, ...data } of upserts) {
response.write(serialize({ type: SyncEntityType.PartnerAssetExifV1, updateId, data }));
response.write(
serialize({
type: SyncEntityType.PartnerAssetExifV1,
ids: [updateId],
data,
}),
);
}
break;
@ -213,12 +321,12 @@ export class SyncService extends BaseService {
checkpointMap[SyncEntityType.AlbumDeleteV1],
);
for await (const { id, ...data } of deletes) {
response.write(serialize({ type: SyncEntityType.AlbumDeleteV1, updateId: id, data }));
response.write(serialize({ type: SyncEntityType.AlbumDeleteV1, ids: [id], data }));
}
const upserts = this.syncRepository.getAlbumUpserts(auth.user.id, checkpointMap[SyncEntityType.AlbumV1]);
for await (const { updateId, ...data } of upserts) {
response.write(serialize({ type: SyncEntityType.AlbumV1, updateId, data }));
response.write(serialize({ type: SyncEntityType.AlbumV1, ids: [updateId], data }));
}
break;
@ -230,7 +338,7 @@ export class SyncService extends BaseService {
checkpointMap[SyncEntityType.AlbumUserDeleteV1],
);
for await (const { id, ...data } of deletes) {
response.write(serialize({ type: SyncEntityType.AlbumUserDeleteV1, updateId: id, data }));
response.write(serialize({ type: SyncEntityType.AlbumUserDeleteV1, ids: [id], data }));
}
const upserts = this.syncRepository.getAlbumUserUpserts(
@ -238,7 +346,7 @@ export class SyncService extends BaseService {
checkpointMap[SyncEntityType.AlbumUserV1],
);
for await (const { updateId, ...data } of upserts) {
response.write(serialize({ type: SyncEntityType.AlbumUserV1, updateId, data }));
response.write(serialize({ type: SyncEntityType.AlbumUserV1, ids: [updateId], data }));
}
break;

View file

@ -421,6 +421,7 @@ export interface IBulkAsset {
export type SyncAck = {
type: SyncEntityType;
updateId: string;
extraId?: string;
};
export type StorageAsset = {

View file

@ -9,20 +9,23 @@ type Impossible<K extends keyof any> = {
type Exact<T, U extends T = T> = U & Impossible<Exclude<keyof U, keyof T>>;
export const fromAck = (ack: string): SyncAck => {
const [type, updateId] = ack.split('|');
return { type: type as SyncEntityType, updateId };
const [type, updateId, extraId] = ack.split('|');
return { type: type as SyncEntityType, updateId, extraId };
};
export const toAck = ({ type, updateId }: SyncAck) => [type, updateId].join('|');
export const toAck = ({ type, updateId, extraId }: SyncAck) =>
[type, updateId, extraId].filter((v) => v !== undefined).join('|');
export const mapJsonLine = (object: unknown) => JSON.stringify(object) + '\n';
export const serialize = <T extends keyof SyncItem, D extends SyncItem[T]>({
type,
updateId,
data,
ids,
ackType,
}: {
type: T;
updateId: string;
data: Exact<SyncItem[T], D>;
}) => mapJsonLine({ type, data, ack: toAck({ type, updateId }) });
ids: [string] | [string, string];
ackType?: SyncEntityType;
}) => mapJsonLine({ type, data, ack: toAck({ type: ackType ?? type, updateId: ids[0], extraId: ids[1] }) });