mirror of
https://github.com/immich-app/immich
synced 2025-11-14 17:36:12 +00:00
feat: sync assets, partner assets, exif, and partner exif (#16658)
* feat: sync assets, partner assets, exif, and partner exif Co-authored-by: Zack Pollard <zack@futo.org> Co-authored-by: Alex Tran <alex.tran1502@gmail.com> * refactor: remove duplicate where clause and orderBy statements in sync queries * fix: asset deletes not filtering by ownerId --------- Co-authored-by: Zack Pollard <zack@futo.org> Co-authored-by: Alex Tran <alex.tran1502@gmail.com> Co-authored-by: Zack Pollard <zackpollard@ymail.com>
This commit is contained in:
parent
e97df503f2
commit
a96bba4b26
28 changed files with 2037 additions and 46 deletions
|
|
@ -117,4 +117,46 @@ export const columns = {
|
|||
userDto: ['id', 'name', 'email', 'profileImagePath', 'profileChangedAt'],
|
||||
tagDto: ['id', 'value', 'createdAt', 'updatedAt', 'color', 'parentId'],
|
||||
apiKey: ['id', 'name', 'userId', 'createdAt', 'updatedAt', 'permissions'],
|
||||
syncAsset: [
|
||||
'id',
|
||||
'ownerId',
|
||||
'thumbhash',
|
||||
'checksum',
|
||||
'fileCreatedAt',
|
||||
'fileModifiedAt',
|
||||
'localDateTime',
|
||||
'type',
|
||||
'deletedAt',
|
||||
'isFavorite',
|
||||
'isVisible',
|
||||
'updateId',
|
||||
],
|
||||
syncAssetExif: [
|
||||
'exif.assetId',
|
||||
'exif.description',
|
||||
'exif.exifImageWidth',
|
||||
'exif.exifImageHeight',
|
||||
'exif.fileSizeInByte',
|
||||
'exif.orientation',
|
||||
'exif.dateTimeOriginal',
|
||||
'exif.modifyDate',
|
||||
'exif.timeZone',
|
||||
'exif.latitude',
|
||||
'exif.longitude',
|
||||
'exif.projectionType',
|
||||
'exif.city',
|
||||
'exif.state',
|
||||
'exif.country',
|
||||
'exif.make',
|
||||
'exif.model',
|
||||
'exif.lensModel',
|
||||
'exif.fNumber',
|
||||
'exif.focalLength',
|
||||
'exif.iso',
|
||||
'exif.exposureTime',
|
||||
'exif.profileDescription',
|
||||
'exif.rating',
|
||||
'exif.fps',
|
||||
'exif.updateId',
|
||||
],
|
||||
} as const;
|
||||
|
|
|
|||
10
server/src/db.d.ts
vendored
10
server/src/db.d.ts
vendored
|
|
@ -119,6 +119,13 @@ export interface AssetJobStatus {
|
|||
thumbnailAt: Timestamp | null;
|
||||
}
|
||||
|
||||
export interface AssetsAudit {
|
||||
deletedAt: Generated<Timestamp>;
|
||||
id: Generated<string>;
|
||||
assetId: string;
|
||||
ownerId: string;
|
||||
}
|
||||
|
||||
export interface Assets {
|
||||
checksum: Buffer;
|
||||
createdAt: Generated<Timestamp>;
|
||||
|
|
@ -168,6 +175,8 @@ export interface Audit {
|
|||
|
||||
export interface Exif {
|
||||
assetId: string;
|
||||
updateId: Generated<string>;
|
||||
updatedAt: Generated<Timestamp>;
|
||||
autoStackId: string | null;
|
||||
bitsPerSample: number | null;
|
||||
city: string | null;
|
||||
|
|
@ -459,6 +468,7 @@ export interface DB {
|
|||
asset_job_status: AssetJobStatus;
|
||||
asset_stack: AssetStack;
|
||||
assets: Assets;
|
||||
assets_audit: AssetsAudit;
|
||||
audit: Audit;
|
||||
exif: Exif;
|
||||
face_search: FaceSearch;
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ const mapStack = (entity: AssetEntity) => {
|
|||
};
|
||||
|
||||
// if an asset is jsonified in the DB before being returned, its buffer fields will be hex-encoded strings
|
||||
const hexOrBufferToBase64 = (encoded: string | Buffer) => {
|
||||
export const hexOrBufferToBase64 = (encoded: string | Buffer) => {
|
||||
if (typeof encoded === 'string') {
|
||||
return Buffer.from(encoded.slice(2), 'hex').toString('base64');
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import { ApiProperty } from '@nestjs/swagger';
|
||||
import { IsEnum, IsInt, IsPositive, IsString } from 'class-validator';
|
||||
import { AssetResponseDto } from 'src/dtos/asset-response.dto';
|
||||
import { SyncEntityType, SyncRequestType } from 'src/enum';
|
||||
import { AssetType, SyncEntityType, SyncRequestType } from 'src/enum';
|
||||
import { Optional, ValidateDate, ValidateUUID } from 'src/validation';
|
||||
|
||||
export class AssetFullSyncDto {
|
||||
|
|
@ -56,11 +56,73 @@ export class SyncPartnerDeleteV1 {
|
|||
sharedWithId!: string;
|
||||
}
|
||||
|
||||
export class SyncAssetV1 {
|
||||
id!: string;
|
||||
ownerId!: string;
|
||||
thumbhash!: string | null;
|
||||
checksum!: string;
|
||||
fileCreatedAt!: Date | null;
|
||||
fileModifiedAt!: Date | null;
|
||||
localDateTime!: Date | null;
|
||||
type!: AssetType;
|
||||
deletedAt!: Date | null;
|
||||
isFavorite!: boolean;
|
||||
isVisible!: boolean;
|
||||
}
|
||||
|
||||
export class SyncAssetDeleteV1 {
|
||||
assetId!: string;
|
||||
}
|
||||
|
||||
export class SyncAssetExifV1 {
|
||||
assetId!: string;
|
||||
description!: string | null;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
exifImageWidth!: number | null;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
exifImageHeight!: number | null;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
fileSizeInByte!: number | null;
|
||||
orientation!: string | null;
|
||||
dateTimeOriginal!: Date | null;
|
||||
modifyDate!: Date | null;
|
||||
timeZone!: string | null;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
latitude!: number | null;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
longitude!: number | null;
|
||||
projectionType!: string | null;
|
||||
city!: string | null;
|
||||
state!: string | null;
|
||||
country!: string | null;
|
||||
make!: string | null;
|
||||
model!: string | null;
|
||||
lensModel!: string | null;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
fNumber!: number | null;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
focalLength!: number | null;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
iso!: number | null;
|
||||
exposureTime!: string | null;
|
||||
profileDescription!: string | null;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
rating!: number | null;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
fps!: number | null;
|
||||
}
|
||||
|
||||
export type SyncItem = {
|
||||
[SyncEntityType.UserV1]: SyncUserV1;
|
||||
[SyncEntityType.UserDeleteV1]: SyncUserDeleteV1;
|
||||
[SyncEntityType.PartnerV1]: SyncPartnerV1;
|
||||
[SyncEntityType.PartnerDeleteV1]: SyncPartnerDeleteV1;
|
||||
[SyncEntityType.AssetV1]: SyncAssetV1;
|
||||
[SyncEntityType.AssetDeleteV1]: SyncAssetDeleteV1;
|
||||
[SyncEntityType.AssetExifV1]: SyncAssetExifV1;
|
||||
[SyncEntityType.PartnerAssetV1]: SyncAssetV1;
|
||||
[SyncEntityType.PartnerAssetDeleteV1]: SyncAssetDeleteV1;
|
||||
[SyncEntityType.PartnerAssetExifV1]: SyncAssetExifV1;
|
||||
};
|
||||
|
||||
const responseDtos = [
|
||||
|
|
@ -69,6 +131,9 @@ const responseDtos = [
|
|||
SyncUserDeleteV1,
|
||||
SyncPartnerV1,
|
||||
SyncPartnerDeleteV1,
|
||||
SyncAssetV1,
|
||||
SyncAssetDeleteV1,
|
||||
SyncAssetExifV1,
|
||||
];
|
||||
|
||||
export const extraSyncModels = responseDtos;
|
||||
|
|
|
|||
19
server/src/entities/asset-audit.entity.ts
Normal file
19
server/src/entities/asset-audit.entity.ts
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
import { Column, CreateDateColumn, Entity, Index, PrimaryColumn } from 'typeorm';
|
||||
|
||||
@Entity('assets_audit')
|
||||
export class AssetAuditEntity {
|
||||
@PrimaryColumn({ type: 'uuid', nullable: false, default: () => 'immich_uuid_v7()' })
|
||||
id!: string;
|
||||
|
||||
@Index('IDX_assets_audit_asset_id')
|
||||
@Column({ type: 'uuid' })
|
||||
assetId!: string;
|
||||
|
||||
@Index('IDX_assets_audit_owner_id')
|
||||
@Column({ type: 'uuid' })
|
||||
ownerId!: string;
|
||||
|
||||
@Index('IDX_assets_audit_deleted_at')
|
||||
@CreateDateColumn({ type: 'timestamptz', default: () => 'clock_timestamp()' })
|
||||
deletedAt!: Date;
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
import { AssetEntity } from 'src/entities/asset.entity';
|
||||
import { Index, JoinColumn, OneToOne, PrimaryColumn } from 'typeorm';
|
||||
import { Index, JoinColumn, OneToOne, PrimaryColumn, UpdateDateColumn } from 'typeorm';
|
||||
import { Column } from 'typeorm/decorator/columns/Column.js';
|
||||
import { Entity } from 'typeorm/decorator/entity/Entity.js';
|
||||
|
||||
|
|
@ -12,6 +12,13 @@ export class ExifEntity {
|
|||
@PrimaryColumn()
|
||||
assetId!: string;
|
||||
|
||||
@UpdateDateColumn({ type: 'timestamptz', default: () => 'clock_timestamp()' })
|
||||
updatedAt?: Date;
|
||||
|
||||
@Index('IDX_asset_exif_update_id')
|
||||
@Column({ type: 'uuid', nullable: false, default: () => 'immich_uuid_v7()' })
|
||||
updateId?: string;
|
||||
|
||||
/* General info */
|
||||
@Column({ type: 'text', default: '' })
|
||||
description!: string; // or caption
|
||||
|
|
|
|||
|
|
@ -549,11 +549,24 @@ export enum DatabaseLock {
|
|||
export enum SyncRequestType {
|
||||
UsersV1 = 'UsersV1',
|
||||
PartnersV1 = 'PartnersV1',
|
||||
AssetsV1 = 'AssetsV1',
|
||||
AssetExifsV1 = 'AssetExifsV1',
|
||||
PartnerAssetsV1 = 'PartnerAssetsV1',
|
||||
PartnerAssetExifsV1 = 'PartnerAssetExifsV1',
|
||||
}
|
||||
|
||||
export enum SyncEntityType {
|
||||
UserV1 = 'UserV1',
|
||||
UserDeleteV1 = 'UserDeleteV1',
|
||||
|
||||
PartnerV1 = 'PartnerV1',
|
||||
PartnerDeleteV1 = 'PartnerDeleteV1',
|
||||
|
||||
AssetV1 = 'AssetV1',
|
||||
AssetDeleteV1 = 'AssetDeleteV1',
|
||||
AssetExifV1 = 'AssetExifV1',
|
||||
|
||||
PartnerAssetV1 = 'PartnerAssetV1',
|
||||
PartnerAssetDeleteV1 = 'PartnerAssetDeleteV1',
|
||||
PartnerAssetExifV1 = 'PartnerAssetExifV1',
|
||||
}
|
||||
|
|
|
|||
37
server/src/migrations/1741191762113-AssetAuditTable.ts
Normal file
37
server/src/migrations/1741191762113-AssetAuditTable.ts
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
import { MigrationInterface, QueryRunner } from "typeorm";
|
||||
|
||||
export class AssetAuditTable1741191762113 implements MigrationInterface {
|
||||
name = 'AssetAuditTable1741191762113'
|
||||
|
||||
public async up(queryRunner: QueryRunner): Promise<void> {
|
||||
await queryRunner.query(`CREATE TABLE "assets_audit" ("id" uuid NOT NULL DEFAULT immich_uuid_v7(), "assetId" uuid NOT NULL, "ownerId" uuid NOT NULL, "deletedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp(), CONSTRAINT "PK_99bd5c015f81a641927a32b4212" PRIMARY KEY ("id"))`);
|
||||
await queryRunner.query(`CREATE INDEX "IDX_assets_audit_asset_id" ON "assets_audit" ("assetId") `);
|
||||
await queryRunner.query(`CREATE INDEX "IDX_assets_audit_owner_id" ON "assets_audit" ("ownerId") `);
|
||||
await queryRunner.query(`CREATE INDEX "IDX_assets_audit_deleted_at" ON "assets_audit" ("deletedAt") `);
|
||||
await queryRunner.query(`CREATE OR REPLACE FUNCTION assets_delete_audit() RETURNS TRIGGER AS
|
||||
$$
|
||||
BEGIN
|
||||
INSERT INTO assets_audit ("assetId", "ownerId")
|
||||
SELECT "id", "ownerId"
|
||||
FROM OLD;
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql`
|
||||
);
|
||||
await queryRunner.query(`CREATE OR REPLACE TRIGGER assets_delete_audit
|
||||
AFTER DELETE ON assets
|
||||
REFERENCING OLD TABLE AS OLD
|
||||
FOR EACH STATEMENT
|
||||
EXECUTE FUNCTION assets_delete_audit();
|
||||
`);
|
||||
}
|
||||
|
||||
public async down(queryRunner: QueryRunner): Promise<void> {
|
||||
await queryRunner.query(`DROP TRIGGER assets_delete_audit`);
|
||||
await queryRunner.query(`DROP FUNCTION assets_delete_audit`);
|
||||
await queryRunner.query(`DROP INDEX "IDX_assets_audit_deleted_at"`);
|
||||
await queryRunner.query(`DROP INDEX "IDX_assets_audit_owner_id"`);
|
||||
await queryRunner.query(`DROP INDEX "IDX_assets_audit_asset_id"`);
|
||||
await queryRunner.query(`DROP TABLE "assets_audit"`);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
import { MigrationInterface, QueryRunner } from 'typeorm';
|
||||
|
||||
export class FixAssetAndUserCascadeConditions1741280328985 implements MigrationInterface {
|
||||
name = 'FixAssetAndUserCascadeConditions1741280328985';
|
||||
|
||||
public async up(queryRunner: QueryRunner): Promise<void> {
|
||||
await queryRunner.query(`
|
||||
CREATE OR REPLACE TRIGGER assets_delete_audit
|
||||
AFTER DELETE ON assets
|
||||
REFERENCING OLD TABLE AS OLD
|
||||
FOR EACH STATEMENT
|
||||
WHEN (pg_trigger_depth() = 0)
|
||||
EXECUTE FUNCTION assets_delete_audit();`);
|
||||
await queryRunner.query(`
|
||||
CREATE OR REPLACE TRIGGER users_delete_audit
|
||||
AFTER DELETE ON users
|
||||
REFERENCING OLD TABLE AS OLD
|
||||
FOR EACH STATEMENT
|
||||
WHEN (pg_trigger_depth() = 0)
|
||||
EXECUTE FUNCTION users_delete_audit();`);
|
||||
await queryRunner.query(`
|
||||
CREATE OR REPLACE TRIGGER partners_delete_audit
|
||||
AFTER DELETE ON partners
|
||||
REFERENCING OLD TABLE AS OLD
|
||||
FOR EACH STATEMENT
|
||||
WHEN (pg_trigger_depth() = 0)
|
||||
EXECUTE FUNCTION partners_delete_audit();`);
|
||||
}
|
||||
|
||||
public async down(queryRunner: QueryRunner): Promise<void> {
|
||||
await queryRunner.query(`
|
||||
CREATE OR REPLACE TRIGGER assets_delete_audit
|
||||
AFTER DELETE ON assets
|
||||
REFERENCING OLD TABLE AS OLD
|
||||
FOR EACH STATEMENT
|
||||
EXECUTE FUNCTION assets_delete_audit();`);
|
||||
await queryRunner.query(`
|
||||
CREATE OR REPLACE TRIGGER users_delete_audit
|
||||
AFTER DELETE ON users
|
||||
REFERENCING OLD TABLE AS OLD
|
||||
FOR EACH STATEMENT
|
||||
EXECUTE FUNCTION users_delete_audit();`);
|
||||
await queryRunner.query(`
|
||||
CREATE OR REPLACE TRIGGER partners_delete_audit
|
||||
AFTER DELETE ON partners
|
||||
REFERENCING OLD TABLE AS OLD
|
||||
FOR EACH STATEMENT
|
||||
EXECUTE FUNCTION partners_delete_audit();`);
|
||||
}
|
||||
}
|
||||
25
server/src/migrations/1741281344519-AddExifUpdateId.ts
Normal file
25
server/src/migrations/1741281344519-AddExifUpdateId.ts
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
import { MigrationInterface, QueryRunner } from 'typeorm';
|
||||
|
||||
export class AddExifUpdateId1741281344519 implements MigrationInterface {
|
||||
name = 'AddExifUpdateId1741281344519';
|
||||
|
||||
public async up(queryRunner: QueryRunner): Promise<void> {
|
||||
await queryRunner.query(
|
||||
`ALTER TABLE "exif" ADD "updatedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT clock_timestamp()`,
|
||||
);
|
||||
await queryRunner.query(`ALTER TABLE "exif" ADD "updateId" uuid NOT NULL DEFAULT immich_uuid_v7()`);
|
||||
await queryRunner.query(`CREATE INDEX "IDX_asset_exif_update_id" ON "exif" ("updateId") `);
|
||||
await queryRunner.query(`
|
||||
create trigger asset_exif_updated_at
|
||||
before update on exif
|
||||
for each row execute procedure updated_at()
|
||||
`);
|
||||
}
|
||||
|
||||
public async down(queryRunner: QueryRunner): Promise<void> {
|
||||
await queryRunner.query(`DROP INDEX "public"."IDX_asset_exif_update_id"`);
|
||||
await queryRunner.query(`ALTER TABLE "exif" DROP COLUMN "updateId"`);
|
||||
await queryRunner.query(`ALTER TABLE "exif" DROP COLUMN "updatedAt"`);
|
||||
await queryRunner.query(`DROP TRIGGER asset_exif_updated_at on exif`);
|
||||
}
|
||||
}
|
||||
|
|
@ -420,8 +420,8 @@ from
|
|||
) as "stacked_assets" on "asset_stack"."id" is not null
|
||||
where
|
||||
"assets"."ownerId" = $1::uuid
|
||||
and "isVisible" = $2
|
||||
and "updatedAt" <= $3
|
||||
and "assets"."isVisible" = $2
|
||||
and "assets"."updatedAt" <= $3
|
||||
and "assets"."id" > $4
|
||||
order by
|
||||
"assets"."id"
|
||||
|
|
@ -450,7 +450,7 @@ from
|
|||
) as "stacked_assets" on "asset_stack"."id" is not null
|
||||
where
|
||||
"assets"."ownerId" = any ($1::uuid[])
|
||||
and "isVisible" = $2
|
||||
and "updatedAt" > $3
|
||||
and "assets"."isVisible" = $2
|
||||
and "assets"."updatedAt" > $3
|
||||
limit
|
||||
$4
|
||||
|
|
|
|||
|
|
@ -551,7 +551,7 @@ export class AssetRepository {
|
|||
return this.getById(asset.id, { exifInfo: true, faces: { person: true } }) as Promise<AssetEntity>;
|
||||
}
|
||||
|
||||
async remove(asset: AssetEntity): Promise<void> {
|
||||
async remove(asset: { id: string }): Promise<void> {
|
||||
await this.db.deleteFrom('assets').where('id', '=', asUuid(asset.id)).execute();
|
||||
}
|
||||
|
||||
|
|
@ -968,8 +968,8 @@ export class AssetRepository {
|
|||
)
|
||||
.select((eb) => eb.fn.toJson(eb.table('stacked_assets')).as('stack'))
|
||||
.where('assets.ownerId', '=', asUuid(ownerId))
|
||||
.where('isVisible', '=', true)
|
||||
.where('updatedAt', '<=', updatedUntil)
|
||||
.where('assets.isVisible', '=', true)
|
||||
.where('assets.updatedAt', '<=', updatedUntil)
|
||||
.$if(!!lastId, (qb) => qb.where('assets.id', '>', lastId!))
|
||||
.orderBy('assets.id')
|
||||
.limit(limit)
|
||||
|
|
@ -996,8 +996,8 @@ export class AssetRepository {
|
|||
)
|
||||
.select((eb) => eb.fn.toJson(eb.table('stacked_assets')).as('stack'))
|
||||
.where('assets.ownerId', '=', anyUuid(options.userIds))
|
||||
.where('isVisible', '=', true)
|
||||
.where('updatedAt', '>', options.updatedAfter)
|
||||
.where('assets.isVisible', '=', true)
|
||||
.where('assets.updatedAt', '>', options.updatedAfter)
|
||||
.limit(options.limit)
|
||||
.execute() as any as Promise<AssetEntity[]>;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,14 @@
|
|||
import { Injectable } from '@nestjs/common';
|
||||
import { Insertable, Kysely, sql } from 'kysely';
|
||||
import { Insertable, Kysely, SelectQueryBuilder, sql } from 'kysely';
|
||||
import { InjectKysely } from 'nestjs-kysely';
|
||||
import { columns } from 'src/database';
|
||||
import { DB, SessionSyncCheckpoints } from 'src/db';
|
||||
import { SyncEntityType } from 'src/enum';
|
||||
import { SyncAck } from 'src/types';
|
||||
|
||||
type auditTables = 'users_audit' | 'partners_audit' | 'assets_audit';
|
||||
type upsertTables = 'users' | 'partners' | 'assets' | 'exif';
|
||||
|
||||
@Injectable()
|
||||
export class SyncRepository {
|
||||
constructor(@InjectKysely() private db: Kysely<DB>) {}
|
||||
|
|
@ -41,9 +45,7 @@ export class SyncRepository {
|
|||
return this.db
|
||||
.selectFrom('users')
|
||||
.select(['id', 'name', 'email', 'deletedAt', 'updateId'])
|
||||
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId))
|
||||
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
|
||||
.orderBy(['updateId asc'])
|
||||
.$call((qb) => this.upsertTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
|
||||
|
|
@ -51,9 +53,7 @@ export class SyncRepository {
|
|||
return this.db
|
||||
.selectFrom('users_audit')
|
||||
.select(['id', 'userId'])
|
||||
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId))
|
||||
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
|
||||
.orderBy(['id asc'])
|
||||
.$call((qb) => this.auditTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
|
||||
|
|
@ -61,10 +61,8 @@ export class SyncRepository {
|
|||
return this.db
|
||||
.selectFrom('partners')
|
||||
.select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId'])
|
||||
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId))
|
||||
.where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)]))
|
||||
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
|
||||
.orderBy(['updateId asc'])
|
||||
.$call((qb) => this.upsertTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
|
||||
|
|
@ -72,10 +70,93 @@ export class SyncRepository {
|
|||
return this.db
|
||||
.selectFrom('partners_audit')
|
||||
.select(['id', 'sharedById', 'sharedWithId'])
|
||||
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId))
|
||||
.where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)]))
|
||||
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
|
||||
.orderBy(['id asc'])
|
||||
.$call((qb) => this.auditTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
|
||||
getAssetUpserts(userId: string, ack?: SyncAck) {
|
||||
return this.db
|
||||
.selectFrom('assets')
|
||||
.select(columns.syncAsset)
|
||||
.where('ownerId', '=', userId)
|
||||
.$call((qb) => this.upsertTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
|
||||
getPartnerAssetsUpserts(userId: string, ack?: SyncAck) {
|
||||
return this.db
|
||||
.selectFrom('assets')
|
||||
.select(columns.syncAsset)
|
||||
.where('ownerId', 'in', (eb) =>
|
||||
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
|
||||
)
|
||||
.$call((qb) => this.upsertTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
|
||||
getAssetDeletes(userId: string, ack?: SyncAck) {
|
||||
return this.db
|
||||
.selectFrom('assets_audit')
|
||||
.select(['id', 'assetId'])
|
||||
.where('ownerId', '=', userId)
|
||||
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId))
|
||||
.$call((qb) => this.auditTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
|
||||
getPartnerAssetDeletes(userId: string, ack?: SyncAck) {
|
||||
return this.db
|
||||
.selectFrom('assets_audit')
|
||||
.select(['id', 'assetId'])
|
||||
.where('ownerId', 'in', (eb) =>
|
||||
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
|
||||
)
|
||||
.$call((qb) => this.auditTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
|
||||
getAssetExifsUpserts(userId: string, ack?: SyncAck) {
|
||||
return this.db
|
||||
.selectFrom('exif')
|
||||
.select(columns.syncAssetExif)
|
||||
.where('assetId', 'in', (eb) => eb.selectFrom('assets').select('id').where('ownerId', '=', userId))
|
||||
.$call((qb) => this.upsertTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
|
||||
getPartnerAssetExifsUpserts(userId: string, ack?: SyncAck) {
|
||||
return this.db
|
||||
.selectFrom('exif')
|
||||
.select(columns.syncAssetExif)
|
||||
.where('assetId', 'in', (eb) =>
|
||||
eb
|
||||
.selectFrom('assets')
|
||||
.select('id')
|
||||
.where('ownerId', 'in', (eb) =>
|
||||
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
|
||||
),
|
||||
)
|
||||
.$call((qb) => this.upsertTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
|
||||
private auditTableFilters<T extends keyof Pick<DB, auditTables>, D>(qb: SelectQueryBuilder<DB, T, D>, ack?: SyncAck) {
|
||||
const builder = qb as SelectQueryBuilder<DB, auditTables, D>;
|
||||
return builder
|
||||
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
|
||||
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId))
|
||||
.orderBy(['id asc']) as SelectQueryBuilder<DB, T, D>;
|
||||
}
|
||||
|
||||
private upsertTableFilters<T extends keyof Pick<DB, upsertTables>, D>(
|
||||
qb: SelectQueryBuilder<DB, T, D>,
|
||||
ack?: SyncAck,
|
||||
) {
|
||||
const builder = qb as SelectQueryBuilder<DB, upsertTables, D>;
|
||||
return builder
|
||||
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
|
||||
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId))
|
||||
.orderBy(['updateId asc']) as SelectQueryBuilder<DB, T, D>;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ import { DateTime } from 'luxon';
|
|||
import { Writable } from 'node:stream';
|
||||
import { AUDIT_LOG_MAX_DURATION } from 'src/constants';
|
||||
import { SessionSyncCheckpoints } from 'src/db';
|
||||
import { AssetResponseDto, mapAsset } from 'src/dtos/asset-response.dto';
|
||||
import { AssetResponseDto, hexOrBufferToBase64, mapAsset } from 'src/dtos/asset-response.dto';
|
||||
import { AuthDto } from 'src/dtos/auth.dto';
|
||||
import {
|
||||
AssetDeltaSyncDto,
|
||||
|
|
@ -22,10 +22,14 @@ import { setIsEqual } from 'src/utils/set';
|
|||
import { fromAck, serialize } from 'src/utils/sync';
|
||||
|
||||
const FULL_SYNC = { needsFullSync: true, deleted: [], upserted: [] };
|
||||
const SYNC_TYPES_ORDER = [
|
||||
export const SYNC_TYPES_ORDER = [
|
||||
//
|
||||
SyncRequestType.UsersV1,
|
||||
SyncRequestType.PartnersV1,
|
||||
SyncRequestType.AssetsV1,
|
||||
SyncRequestType.AssetExifsV1,
|
||||
SyncRequestType.PartnerAssetsV1,
|
||||
SyncRequestType.PartnerAssetExifsV1,
|
||||
];
|
||||
|
||||
const throwSessionRequired = () => {
|
||||
|
|
@ -49,17 +53,22 @@ export class SyncService extends BaseService {
|
|||
return throwSessionRequired();
|
||||
}
|
||||
|
||||
const checkpoints: Insertable<SessionSyncCheckpoints>[] = [];
|
||||
const checkpoints: Record<string, Insertable<SessionSyncCheckpoints>> = {};
|
||||
for (const ack of dto.acks) {
|
||||
const { type } = fromAck(ack);
|
||||
// TODO proper ack validation via class validator
|
||||
if (!Object.values(SyncEntityType).includes(type)) {
|
||||
throw new BadRequestException(`Invalid ack type: ${type}`);
|
||||
}
|
||||
checkpoints.push({ sessionId, type, ack });
|
||||
|
||||
if (checkpoints[type]) {
|
||||
throw new BadRequestException('Only one ack per type is allowed');
|
||||
}
|
||||
|
||||
checkpoints[type] = { sessionId, type, ack };
|
||||
}
|
||||
|
||||
await this.syncRepository.upsertCheckpoints(checkpoints);
|
||||
await this.syncRepository.upsertCheckpoints(Object.values(checkpoints));
|
||||
}
|
||||
|
||||
async deleteAcks(auth: AuthDto, dto: SyncAckDeleteDto) {
|
||||
|
|
@ -115,6 +124,87 @@ export class SyncService extends BaseService {
|
|||
break;
|
||||
}
|
||||
|
||||
case SyncRequestType.AssetsV1: {
|
||||
const deletes = this.syncRepository.getAssetDeletes(
|
||||
auth.user.id,
|
||||
checkpointMap[SyncEntityType.AssetDeleteV1],
|
||||
);
|
||||
for await (const { id, ...data } of deletes) {
|
||||
response.write(serialize({ type: SyncEntityType.AssetDeleteV1, updateId: id, data }));
|
||||
}
|
||||
|
||||
const upserts = this.syncRepository.getAssetUpserts(auth.user.id, checkpointMap[SyncEntityType.AssetV1]);
|
||||
for await (const { updateId, checksum, thumbhash, ...data } of upserts) {
|
||||
response.write(
|
||||
serialize({
|
||||
type: SyncEntityType.AssetV1,
|
||||
updateId,
|
||||
data: {
|
||||
...data,
|
||||
checksum: hexOrBufferToBase64(checksum),
|
||||
thumbhash: thumbhash ? hexOrBufferToBase64(thumbhash) : null,
|
||||
},
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case SyncRequestType.PartnerAssetsV1: {
|
||||
const deletes = this.syncRepository.getPartnerAssetDeletes(
|
||||
auth.user.id,
|
||||
checkpointMap[SyncEntityType.PartnerAssetDeleteV1],
|
||||
);
|
||||
for await (const { id, ...data } of deletes) {
|
||||
response.write(serialize({ type: SyncEntityType.PartnerAssetDeleteV1, updateId: id, data }));
|
||||
}
|
||||
|
||||
const upserts = this.syncRepository.getPartnerAssetsUpserts(
|
||||
auth.user.id,
|
||||
checkpointMap[SyncEntityType.PartnerAssetV1],
|
||||
);
|
||||
for await (const { updateId, checksum, thumbhash, ...data } of upserts) {
|
||||
response.write(
|
||||
serialize({
|
||||
type: SyncEntityType.PartnerAssetV1,
|
||||
updateId,
|
||||
data: {
|
||||
...data,
|
||||
checksum: hexOrBufferToBase64(checksum),
|
||||
thumbhash: thumbhash ? hexOrBufferToBase64(thumbhash) : null,
|
||||
},
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case SyncRequestType.AssetExifsV1: {
|
||||
const upserts = this.syncRepository.getAssetExifsUpserts(
|
||||
auth.user.id,
|
||||
checkpointMap[SyncEntityType.AssetExifV1],
|
||||
);
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
response.write(serialize({ type: SyncEntityType.AssetExifV1, updateId, data }));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case SyncRequestType.PartnerAssetExifsV1: {
|
||||
const upserts = this.syncRepository.getPartnerAssetExifsUpserts(
|
||||
auth.user.id,
|
||||
checkpointMap[SyncEntityType.PartnerAssetExifV1],
|
||||
);
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
response.write(serialize({ type: SyncEntityType.PartnerAssetExifV1, updateId, data }));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
default: {
|
||||
this.logger.warn(`Unsupported sync type: ${type}`);
|
||||
break;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue