feat: sync implementation for the user entity (#16234)

* ci: print out typeorm generation changes

* feat: sync implementation for the user entity

wip

---------

Co-authored-by: Jason Rasmussen <jason@rasm.me>
This commit is contained in:
Zack Pollard 2025-02-21 04:37:57 +00:00 committed by GitHub
parent 02cd8da871
commit ac36effb45
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
38 changed files with 1774 additions and 10 deletions

View file

@ -29,7 +29,7 @@ import { AuthService } from 'src/services/auth.service';
import { CliService } from 'src/services/cli.service';
import { DatabaseService } from 'src/services/database.service';
const common = [...repositories, ...services];
const common = [...repositories, ...services, GlobalExceptionFilter];
const middleware = [
FileUploadInterceptor,

View file

@ -1,15 +1,28 @@
import { Body, Controller, HttpCode, HttpStatus, Post } from '@nestjs/common';
import { Body, Controller, Delete, Get, Header, HttpCode, HttpStatus, Post, Res } from '@nestjs/common';
import { ApiTags } from '@nestjs/swagger';
import { Response } from 'express';
import { AssetResponseDto } from 'src/dtos/asset-response.dto';
import { AuthDto } from 'src/dtos/auth.dto';
import { AssetDeltaSyncDto, AssetDeltaSyncResponseDto, AssetFullSyncDto } from 'src/dtos/sync.dto';
import {
AssetDeltaSyncDto,
AssetDeltaSyncResponseDto,
AssetFullSyncDto,
SyncAckDeleteDto,
SyncAckDto,
SyncAckSetDto,
SyncStreamDto,
} from 'src/dtos/sync.dto';
import { Auth, Authenticated } from 'src/middleware/auth.guard';
import { GlobalExceptionFilter } from 'src/middleware/global-exception.filter';
import { SyncService } from 'src/services/sync.service';
@ApiTags('Sync')
@Controller('sync')
export class SyncController {
constructor(private service: SyncService) {}
constructor(
private service: SyncService,
private errorService: GlobalExceptionFilter,
) {}
@Post('full-sync')
@HttpCode(HttpStatus.OK)
@ -24,4 +37,37 @@ export class SyncController {
getDeltaSync(@Auth() auth: AuthDto, @Body() dto: AssetDeltaSyncDto): Promise<AssetDeltaSyncResponseDto> {
return this.service.getDeltaSync(auth, dto);
}
@Post('stream')
@Header('Content-Type', 'application/jsonlines+json')
@HttpCode(HttpStatus.OK)
@Authenticated()
async getSyncStream(@Auth() auth: AuthDto, @Res() res: Response, @Body() dto: SyncStreamDto) {
try {
await this.service.stream(auth, res, dto);
} catch (error: Error | any) {
res.setHeader('Content-Type', 'application/json');
this.errorService.handleError(res, error);
}
}
@Get('ack')
@Authenticated()
getSyncAck(@Auth() auth: AuthDto): Promise<SyncAckDto[]> {
return this.service.getAcks(auth);
}
@Post('ack')
@HttpCode(HttpStatus.NO_CONTENT)
@Authenticated()
sendSyncAck(@Auth() auth: AuthDto, @Body() dto: SyncAckSetDto) {
return this.service.setAcks(auth, dto);
}
@Delete('ack')
@HttpCode(HttpStatus.NO_CONTENT)
@Authenticated()
deleteSyncAck(@Auth() auth: AuthDto, @Body() dto: SyncAckDeleteDto) {
return this.service.deleteAcks(auth, dto);
}
}

View file

@ -1,3 +1,4 @@
import { sql } from 'kysely';
import { Permission } from 'src/enum';
export type AuthUser = {
@ -29,6 +30,8 @@ export type AuthSession = {
};
export const columns = {
ackEpoch: (columnName: 'createdAt' | 'updatedAt' | 'deletedAt') =>
sql.raw<string>(`extract(epoch from "${columnName}")::text`).as('ackEpoch'),
authUser: [
'users.id',
'users.name',

18
server/src/db.d.ts vendored
View file

@ -4,7 +4,7 @@
*/
import type { ColumnType } from 'kysely';
import { Permission } from 'src/enum';
import { Permission, SyncEntityType } from 'src/enum';
export type ArrayType<T> = ArrayTypeImpl<T> extends (infer U)[] ? U[] : ArrayTypeImpl<T>;
@ -294,6 +294,15 @@ export interface Sessions {
userId: string;
}
export interface SessionSyncCheckpoints {
ack: string;
createdAt: Generated<Timestamp>;
sessionId: string;
type: SyncEntityType;
updatedAt: Generated<Timestamp>;
}
export interface SharedLinkAsset {
assetsId: string;
sharedLinksId: string;
@ -384,6 +393,11 @@ export interface Users {
updatedAt: Generated<Timestamp>;
}
export interface UsersAudit {
userId: string;
deletedAt: Generated<Timestamp>;
}
export interface VectorsPgVectorIndexStat {
idx_growing: ArrayType<Int8> | null;
idx_indexing: boolean | null;
@ -429,6 +443,7 @@ export interface DB {
partners: Partners;
person: Person;
sessions: Sessions;
session_sync_checkpoints: SessionSyncCheckpoints;
shared_link__asset: SharedLinkAsset;
shared_links: SharedLinks;
smart_search: SmartSearch;
@ -440,6 +455,7 @@ export interface DB {
typeorm_metadata: TypeormMetadata;
user_metadata: UserMetadata;
users: Users;
users_audit: UsersAudit;
'vectors.pg_vector_index_stat': VectorsPgVectorIndexStat;
version_history: VersionHistory;
}

View file

@ -1,7 +1,8 @@
import { ApiProperty } from '@nestjs/swagger';
import { IsInt, IsPositive } from 'class-validator';
import { IsEnum, IsInt, IsPositive, IsString } from 'class-validator';
import { AssetResponseDto } from 'src/dtos/asset-response.dto';
import { ValidateDate, ValidateUUID } from 'src/validation';
import { SyncEntityType, SyncRequestType } from 'src/enum';
import { Optional, ValidateDate, ValidateUUID } from 'src/validation';
export class AssetFullSyncDto {
@ValidateUUID({ optional: true })
@ -32,3 +33,51 @@ export class AssetDeltaSyncResponseDto {
upserted!: AssetResponseDto[];
deleted!: string[];
}
export class SyncUserV1 {
id!: string;
name!: string;
email!: string;
deletedAt!: Date | null;
}
export class SyncUserDeleteV1 {
userId!: string;
}
export type SyncItem = {
[SyncEntityType.UserV1]: SyncUserV1;
[SyncEntityType.UserDeleteV1]: SyncUserDeleteV1;
};
const responseDtos = [
//
SyncUserV1,
SyncUserDeleteV1,
];
export const extraSyncModels = responseDtos;
export class SyncStreamDto {
@IsEnum(SyncRequestType, { each: true })
@ApiProperty({ enumName: 'SyncRequestType', enum: SyncRequestType, isArray: true })
types!: SyncRequestType[];
}
export class SyncAckDto {
@ApiProperty({ enumName: 'SyncEntityType', enum: SyncEntityType })
type!: SyncEntityType;
ack!: string;
}
export class SyncAckSetDto {
@IsString({ each: true })
acks!: string[];
}
export class SyncAckDeleteDto {
@IsEnum(SyncEntityType, { each: true })
@ApiProperty({ enumName: 'SyncEntityType', enum: SyncEntityType, isArray: true })
@Optional()
types?: SyncEntityType[];
}

View file

@ -20,8 +20,10 @@ import { SessionEntity } from 'src/entities/session.entity';
import { SharedLinkEntity } from 'src/entities/shared-link.entity';
import { SmartSearchEntity } from 'src/entities/smart-search.entity';
import { StackEntity } from 'src/entities/stack.entity';
import { SessionSyncCheckpointEntity } from 'src/entities/sync-checkpoint.entity';
import { SystemMetadataEntity } from 'src/entities/system-metadata.entity';
import { TagEntity } from 'src/entities/tag.entity';
import { UserAuditEntity } from 'src/entities/user-audit.entity';
import { UserMetadataEntity } from 'src/entities/user-metadata.entity';
import { UserEntity } from 'src/entities/user.entity';
import { VersionHistoryEntity } from 'src/entities/version-history.entity';
@ -44,12 +46,14 @@ export const entities = [
MoveEntity,
PartnerEntity,
PersonEntity,
SessionSyncCheckpointEntity,
SharedLinkEntity,
SmartSearchEntity,
StackEntity,
SystemMetadataEntity,
TagEntity,
UserEntity,
UserAuditEntity,
UserMetadataEntity,
SessionEntity,
LibraryEntity,

View file

@ -0,0 +1,24 @@
import { SessionEntity } from 'src/entities/session.entity';
import { SyncEntityType } from 'src/enum';
import { Column, CreateDateColumn, Entity, ManyToOne, PrimaryColumn, UpdateDateColumn } from 'typeorm';
@Entity('session_sync_checkpoints')
export class SessionSyncCheckpointEntity {
@ManyToOne(() => SessionEntity, { onDelete: 'CASCADE', onUpdate: 'CASCADE' })
session?: SessionEntity;
@PrimaryColumn()
sessionId!: string;
@PrimaryColumn({ type: 'varchar' })
type!: SyncEntityType;
@CreateDateColumn({ type: 'timestamptz' })
createdAt!: Date;
@UpdateDateColumn({ type: 'timestamptz' })
updatedAt!: Date;
@Column()
ack!: string;
}

View file

@ -0,0 +1,14 @@
import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm';
@Entity('users_audit')
@Index('IDX_users_audit_deleted_at_asc_user_id_asc', ['deletedAt', 'userId'])
export class UserAuditEntity {
@PrimaryGeneratedColumn('increment')
id!: number;
@Column({ type: 'uuid' })
userId!: string;
@CreateDateColumn({ type: 'timestamptz' })
deletedAt!: Date;
}

View file

@ -10,12 +10,14 @@ import {
CreateDateColumn,
DeleteDateColumn,
Entity,
Index,
OneToMany,
PrimaryGeneratedColumn,
UpdateDateColumn,
} from 'typeorm';
@Entity('users')
@Index('IDX_users_updated_at_asc_id_asc', ['updatedAt', 'id'])
export class UserEntity {
@PrimaryGeneratedColumn('uuid')
id!: string;

View file

@ -537,3 +537,12 @@ export enum DatabaseLock {
GetSystemConfig = 69,
BackupDatabase = 42,
}
export enum SyncRequestType {
UsersV1 = 'UsersV1',
}
export enum SyncEntityType {
UserV1 = 'UserV1',
UserDeleteV1 = 'UserDeleteV1',
}

View file

@ -22,6 +22,13 @@ export class GlobalExceptionFilter implements ExceptionFilter<Error> {
}
}
handleError(res: Response, error: Error) {
const { status, body } = this.fromError(error);
if (!res.headersSent) {
res.status(status).json({ ...body, statusCode: status, correlationId: this.cls.getId() });
}
}
private fromError(error: Error) {
logGlobalError(this.logger, error);

View file

@ -0,0 +1,22 @@
import { MigrationInterface, QueryRunner } from "typeorm";
export class AddSessionSyncCheckpointTable1740001232576 implements MigrationInterface {
name = 'AddSessionSyncCheckpointTable1740001232576'
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE TABLE "session_sync_checkpoints" ("sessionId" uuid NOT NULL, "type" character varying NOT NULL, "createdAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), "updatedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), "ack" character varying NOT NULL, CONSTRAINT "PK_b846ab547a702863ef7cd9412fb" PRIMARY KEY ("sessionId", "type"))`);
await queryRunner.query(`ALTER TABLE "session_sync_checkpoints" ADD CONSTRAINT "FK_d8ddd9d687816cc490432b3d4bc" FOREIGN KEY ("sessionId") REFERENCES "sessions"("id") ON DELETE CASCADE ON UPDATE CASCADE`);
await queryRunner.query(`
create trigger session_sync_checkpoints_updated_at
before update on session_sync_checkpoints
for each row execute procedure updated_at()
`);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`drop trigger session_sync_checkpoints_updated_at on session_sync_checkpoints`);
await queryRunner.query(`ALTER TABLE "session_sync_checkpoints" DROP CONSTRAINT "FK_d8ddd9d687816cc490432b3d4bc"`);
await queryRunner.query(`DROP TABLE "session_sync_checkpoints"`);
}
}

View file

@ -0,0 +1,34 @@
import { MigrationInterface, QueryRunner } from "typeorm";
export class AddUsersAuditTable1740064899123 implements MigrationInterface {
name = 'AddUsersAuditTable1740064899123'
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE INDEX IF NOT EXISTS "IDX_users_updated_at_asc_id_asc" ON "users" ("updatedAt" ASC, "id" ASC);`)
await queryRunner.query(`CREATE TABLE "users_audit" ("id" SERIAL NOT NULL, "userId" uuid NOT NULL, "deletedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), CONSTRAINT "PK_e9b2bdfd90e7eb5961091175180" PRIMARY KEY ("id"))`);
await queryRunner.query(`CREATE INDEX IF NOT EXISTS "IDX_users_audit_deleted_at_asc_user_id_asc" ON "users_audit" ("deletedAt" ASC, "userId" ASC);`)
await queryRunner.query(`CREATE OR REPLACE FUNCTION users_delete_audit() RETURNS TRIGGER AS
$$
BEGIN
INSERT INTO users_audit ("userId")
SELECT "id"
FROM OLD;
RETURN NULL;
END;
$$ LANGUAGE plpgsql`
);
await queryRunner.query(`CREATE OR REPLACE TRIGGER users_delete_audit
AFTER DELETE ON users
REFERENCING OLD TABLE AS OLD
FOR EACH STATEMENT
EXECUTE FUNCTION users_delete_audit();
`);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TRIGGER users_delete_audit`);
await queryRunner.query(`DROP FUNCTION users_delete_audit`);
await queryRunner.query(`DROP TABLE "users_audit"`);
}
}

View file

@ -30,6 +30,7 @@ import { SessionRepository } from 'src/repositories/session.repository';
import { SharedLinkRepository } from 'src/repositories/shared-link.repository';
import { StackRepository } from 'src/repositories/stack.repository';
import { StorageRepository } from 'src/repositories/storage.repository';
import { SyncRepository } from 'src/repositories/sync.repository';
import { SystemMetadataRepository } from 'src/repositories/system-metadata.repository';
import { TagRepository } from 'src/repositories/tag.repository';
import { TelemetryRepository } from 'src/repositories/telemetry.repository';
@ -71,6 +72,7 @@ export const repositories = [
SharedLinkRepository,
StackRepository,
StorageRepository,
SyncRepository,
SystemMetadataRepository,
TagRepository,
TelemetryRepository,

View file

@ -0,0 +1,80 @@
import { Injectable } from '@nestjs/common';
import { Insertable, Kysely, sql } from 'kysely';
import { InjectKysely } from 'nestjs-kysely';
import { columns } from 'src/database';
import { DB, SessionSyncCheckpoints } from 'src/db';
import { SyncEntityType } from 'src/enum';
import { SyncAck } from 'src/types';
@Injectable()
export class SyncRepository {
constructor(@InjectKysely() private db: Kysely<DB>) {}
getCheckpoints(sessionId: string) {
return this.db
.selectFrom('session_sync_checkpoints')
.select(['type', 'ack'])
.where('sessionId', '=', sessionId)
.execute();
}
upsertCheckpoints(items: Insertable<SessionSyncCheckpoints>[]) {
return this.db
.insertInto('session_sync_checkpoints')
.values(items)
.onConflict((oc) =>
oc.columns(['sessionId', 'type']).doUpdateSet((eb) => ({
ack: eb.ref('excluded.ack'),
})),
)
.execute();
}
deleteCheckpoints(sessionId: string, types?: SyncEntityType[]) {
return this.db
.deleteFrom('session_sync_checkpoints')
.where('sessionId', '=', sessionId)
.$if(!!types, (qb) => qb.where('type', 'in', types!))
.execute();
}
getUserUpserts(ack?: SyncAck) {
return this.db
.selectFrom('users')
.select(['id', 'name', 'email', 'deletedAt'])
.select(columns.ackEpoch('updatedAt'))
.$if(!!ack, (qb) =>
qb.where((eb) =>
eb.or([
eb(eb.fn<Date>('to_timestamp', [sql.val(ack!.ackEpoch)]), '<', eb.ref('updatedAt')),
eb.and([
eb(eb.fn<Date>('to_timestamp', [sql.val(ack!.ackEpoch)]), '<=', eb.ref('updatedAt')),
eb('id', '>', ack!.ids[0]),
]),
]),
),
)
.orderBy(['updatedAt asc', 'id asc'])
.stream();
}
getUserDeletes(ack?: SyncAck) {
return this.db
.selectFrom('users_audit')
.select(['userId'])
.select(columns.ackEpoch('deletedAt'))
.$if(!!ack, (qb) =>
qb.where((eb) =>
eb.or([
eb(eb.fn<Date>('to_timestamp', [sql.val(ack!.ackEpoch)]), '<', eb.ref('deletedAt')),
eb.and([
eb(eb.fn<Date>('to_timestamp', [sql.val(ack!.ackEpoch)]), '<=', eb.ref('deletedAt')),
eb('userId', '>', ack!.ids[0]),
]),
]),
),
)
.orderBy(['deletedAt asc', 'userId asc'])
.stream();
}
}

View file

@ -38,6 +38,7 @@ import { SessionRepository } from 'src/repositories/session.repository';
import { SharedLinkRepository } from 'src/repositories/shared-link.repository';
import { StackRepository } from 'src/repositories/stack.repository';
import { StorageRepository } from 'src/repositories/storage.repository';
import { SyncRepository } from 'src/repositories/sync.repository';
import { SystemMetadataRepository } from 'src/repositories/system-metadata.repository';
import { TagRepository } from 'src/repositories/tag.repository';
import { TelemetryRepository } from 'src/repositories/telemetry.repository';
@ -85,6 +86,7 @@ export class BaseService {
protected sharedLinkRepository: SharedLinkRepository,
protected stackRepository: StackRepository,
protected storageRepository: StorageRepository,
protected syncRepository: SyncRepository,
protected systemMetadataRepository: SystemMetadataRepository,
protected tagRepository: TagRepository,
protected telemetryRepository: TelemetryRepository,

View file

@ -1,18 +1,112 @@
import { Injectable } from '@nestjs/common';
import { ForbiddenException, Injectable } from '@nestjs/common';
import { Insertable } from 'kysely';
import { DateTime } from 'luxon';
import { Writable } from 'node:stream';
import { AUDIT_LOG_MAX_DURATION } from 'src/constants';
import { SessionSyncCheckpoints } from 'src/db';
import { AssetResponseDto, mapAsset } from 'src/dtos/asset-response.dto';
import { AuthDto } from 'src/dtos/auth.dto';
import { AssetDeltaSyncDto, AssetDeltaSyncResponseDto, AssetFullSyncDto } from 'src/dtos/sync.dto';
import { DatabaseAction, EntityType, Permission } from 'src/enum';
import {
AssetDeltaSyncDto,
AssetDeltaSyncResponseDto,
AssetFullSyncDto,
SyncAckDeleteDto,
SyncAckSetDto,
SyncStreamDto,
} from 'src/dtos/sync.dto';
import { DatabaseAction, EntityType, Permission, SyncEntityType, SyncRequestType } from 'src/enum';
import { BaseService } from 'src/services/base.service';
import { SyncAck } from 'src/types';
import { getMyPartnerIds } from 'src/utils/asset.util';
import { setIsEqual } from 'src/utils/set';
import { fromAck, serialize } from 'src/utils/sync';
const FULL_SYNC = { needsFullSync: true, deleted: [], upserted: [] };
const SYNC_TYPES_ORDER = [
//
SyncRequestType.UsersV1,
];
const throwSessionRequired = () => {
throw new ForbiddenException('Sync endpoints cannot be used with API keys');
};
@Injectable()
export class SyncService extends BaseService {
getAcks(auth: AuthDto) {
const sessionId = auth.session?.id;
if (!sessionId) {
return throwSessionRequired();
}
return this.syncRepository.getCheckpoints(sessionId);
}
async setAcks(auth: AuthDto, dto: SyncAckSetDto) {
// TODO ack validation
const sessionId = auth.session?.id;
if (!sessionId) {
return throwSessionRequired();
}
const checkpoints: Insertable<SessionSyncCheckpoints>[] = [];
for (const ack of dto.acks) {
const { type } = fromAck(ack);
checkpoints.push({ sessionId, type, ack });
}
await this.syncRepository.upsertCheckpoints(checkpoints);
}
async deleteAcks(auth: AuthDto, dto: SyncAckDeleteDto) {
const sessionId = auth.session?.id;
if (!sessionId) {
return throwSessionRequired();
}
await this.syncRepository.deleteCheckpoints(sessionId, dto.types);
}
async stream(auth: AuthDto, response: Writable, dto: SyncStreamDto) {
const sessionId = auth.session?.id;
if (!sessionId) {
return throwSessionRequired();
}
const checkpoints = await this.syncRepository.getCheckpoints(sessionId);
const checkpointMap: Partial<Record<SyncEntityType, SyncAck>> = Object.fromEntries(
checkpoints.map(({ type, ack }) => [type, fromAck(ack)]),
);
// TODO pre-filter/sort list based on optimal sync order
for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) {
switch (type) {
case SyncRequestType.UsersV1: {
const deletes = this.syncRepository.getUserDeletes(checkpointMap[SyncEntityType.UserDeleteV1]);
for await (const { ackEpoch, ...data } of deletes) {
response.write(serialize({ type: SyncEntityType.UserDeleteV1, ackEpoch, ids: [data.userId], data }));
}
const upserts = this.syncRepository.getUserUpserts(checkpointMap[SyncEntityType.UserV1]);
for await (const { ackEpoch, ...data } of upserts) {
response.write(serialize({ type: SyncEntityType.UserV1, ackEpoch, ids: [data.id], data }));
}
break;
}
default: {
this.logger.warn(`Unsupported sync type: ${type}`);
break;
}
}
}
response.end();
}
async getFullSync(auth: AuthDto, dto: AssetFullSyncDto): Promise<AssetResponseDto[]> {
// mobile implementation is faster if this is a single id
const userId = dto.userId || auth.user.id;

View file

@ -4,6 +4,7 @@ import {
ImageFormat,
JobName,
QueueName,
SyncEntityType,
TranscodeTarget,
VideoCodec,
} from 'src/enum';
@ -409,3 +410,9 @@ export interface IBulkAsset {
addAssetIds: (id: string, assetIds: string[]) => Promise<void>;
removeAssetIds: (id: string, assetIds: string[]) => Promise<void>;
}
export type SyncAck = {
type: SyncEntityType;
ackEpoch: string;
ids: string[];
};

View file

@ -12,6 +12,7 @@ import { writeFileSync } from 'node:fs';
import path from 'node:path';
import { SystemConfig } from 'src/config';
import { CLIP_MODEL_INFO, serverVersion } from 'src/constants';
import { extraSyncModels } from 'src/dtos/sync.dto';
import { ImmichCookie, ImmichHeader, MetadataKey } from 'src/enum';
import { LoggingRepository } from 'src/repositories/logging.repository';
@ -245,6 +246,7 @@ export const useSwagger = (app: INestApplication, { write }: { write: boolean })
const options: SwaggerDocumentOptions = {
operationIdFactory: (controllerKey: string, methodKey: string) => methodKey,
extraModels: extraSyncModels,
};
const specification = SwaggerModule.createDocument(app, config, options);

30
server/src/utils/sync.ts Normal file
View file

@ -0,0 +1,30 @@
import { SyncItem } from 'src/dtos/sync.dto';
import { SyncEntityType } from 'src/enum';
import { SyncAck } from 'src/types';
type Impossible<K extends keyof any> = {
[P in K]: never;
};
type Exact<T, U extends T = T> = U & Impossible<Exclude<keyof U, keyof T>>;
export const fromAck = (ack: string): SyncAck => {
const [type, timestamp, ...ids] = ack.split('|');
return { type: type as SyncEntityType, ackEpoch: timestamp, ids };
};
export const toAck = ({ type, ackEpoch, ids }: SyncAck) => [type, ackEpoch, ...ids].join('|');
export const mapJsonLine = (object: unknown) => JSON.stringify(object) + '\n';
export const serialize = <T extends keyof SyncItem, D extends SyncItem[T]>({
type,
ackEpoch,
ids,
data,
}: {
type: T;
ackEpoch: string;
ids: string[];
data: Exact<SyncItem[T], D>;
}) => mapJsonLine({ type, data, ack: toAck({ type, ackEpoch, ids }) });

View file

@ -0,0 +1,13 @@
import { SyncRepository } from 'src/repositories/sync.repository';
import { RepositoryInterface } from 'src/types';
import { Mocked, vitest } from 'vitest';
export const newSyncRepositoryMock = (): Mocked<RepositoryInterface<SyncRepository>> => {
return {
getCheckpoints: vitest.fn(),
upsertCheckpoints: vitest.fn(),
deleteCheckpoints: vitest.fn(),
getUserUpserts: vitest.fn(),
getUserDeletes: vitest.fn(),
};
};

View file

@ -34,6 +34,7 @@ import { SessionRepository } from 'src/repositories/session.repository';
import { SharedLinkRepository } from 'src/repositories/shared-link.repository';
import { StackRepository } from 'src/repositories/stack.repository';
import { StorageRepository } from 'src/repositories/storage.repository';
import { SyncRepository } from 'src/repositories/sync.repository';
import { SystemMetadataRepository } from 'src/repositories/system-metadata.repository';
import { TagRepository } from 'src/repositories/tag.repository';
import { TelemetryRepository } from 'src/repositories/telemetry.repository';
@ -75,6 +76,7 @@ import { newSessionRepositoryMock } from 'test/repositories/session.repository.m
import { newSharedLinkRepositoryMock } from 'test/repositories/shared-link.repository.mock';
import { newStackRepositoryMock } from 'test/repositories/stack.repository.mock';
import { newStorageRepositoryMock } from 'test/repositories/storage.repository.mock';
import { newSyncRepositoryMock } from 'test/repositories/sync.repository.mock';
import { newSystemMetadataRepositoryMock } from 'test/repositories/system-metadata.repository.mock';
import { newTagRepositoryMock } from 'test/repositories/tag.repository.mock';
import { ITelemetryRepositoryMock, newTelemetryRepositoryMock } from 'test/repositories/telemetry.repository.mock';
@ -178,6 +180,7 @@ export const newTestService = <T extends BaseService>(
const sharedLinkMock = newSharedLinkRepositoryMock();
const stackMock = newStackRepositoryMock();
const storageMock = newStorageRepositoryMock();
const syncMock = newSyncRepositoryMock();
const systemMock = newSystemMetadataRepositoryMock();
const tagMock = newTagRepositoryMock();
const telemetryMock = newTelemetryRepositoryMock();
@ -219,6 +222,7 @@ export const newTestService = <T extends BaseService>(
sharedLinkMock as RepositoryInterface<SharedLinkRepository> as SharedLinkRepository,
stackMock as RepositoryInterface<StackRepository> as StackRepository,
storageMock as RepositoryInterface<StorageRepository> as StorageRepository,
syncMock as RepositoryInterface<SyncRepository> as SyncRepository,
systemMock as RepositoryInterface<SystemMetadataRepository> as SystemMetadataRepository,
tagMock as RepositoryInterface<TagRepository> as TagRepository,
telemetryMock as unknown as TelemetryRepository,