Commit 92ccc7c8 authored by 375242562@qq.com's avatar 375242562@qq.com

feat: 同步批次号 — 每次同步生成批次记录,支持按批次删除患者数据

后端:
- 新增 SyncBatch 模型(sync_batches 表): 记录每次同步的来源、模式、新增/更新数量
- Patient 模型增加 sync_batch_id 字段,打上每次同步的批次号
- sync_service: 每次同步生成 UUID 批次号,新增/更新患者均标记,结束后写入批次记录
- GET /sync/batches: 列出所有批次(含当前存活患者数)
- DELETE /sync/batches/{id}: 删除该批次所有患者数据及批次记录
- SQLite 迁移: patients.sync_batch_id 列 + sync_batches 表

前端:
- DataSyncPage 底部新增「同步批次记录」表格
  展示批次号(hover显示完整UUID)、来源任务、模式、获取/新增/更新/当前患者数、时间
  删除按钮: 当前患者数=0时禁用,点击弹确认框后批量删除
- syncService 新增 listBatches / deleteBatch 方法
- 同步成功提示带上批次号前8位
parent b6692c94
...@@ -9,8 +9,11 @@ from pydantic import BaseModel ...@@ -9,8 +9,11 @@ from pydantic import BaseModel
from app.database import get_db from app.database import get_db
from app.core.deps import get_current_user from app.core.deps import get_current_user
from app.models.user import User from app.models.user import User
from sqlalchemy import delete as sa_delete, func
from app.models.sync_source import SyncSource from app.models.sync_source import SyncSource
from app.models.connection_profile import ConnectionProfile from app.models.connection_profile import ConnectionProfile
from app.models.sync_batch import SyncBatch
from app.models.patient import Patient
from app.services.sync_service import sync_source as do_sync from app.services.sync_service import sync_source as do_sync
from app.services.sync_adapters import get_adapter, _sync_test_connection, _list_tables from app.services.sync_adapters import get_adapter, _sync_test_connection, _list_tables
...@@ -281,6 +284,79 @@ async def sync_status( ...@@ -281,6 +284,79 @@ async def sync_status(
} }
class SyncBatchResponse(BaseModel):
id: str
source_id: str
source_name: str
sync_mode: str
created_count: int
updated_count: int
total_fetched: int
patient_count: int
created_at: str
@router.get("/batches", response_model=list[SyncBatchResponse])
async def list_batches(
skip: int = Query(0, ge=0),
limit: int = Query(20, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all sync batches ordered by creation time descending."""
result = await db.execute(
select(SyncBatch).order_by(SyncBatch.created_at.desc()).offset(skip).limit(limit)
)
batches = result.scalars().all()
responses = []
for b in batches:
count = await db.scalar(
select(func.count()).select_from(Patient).where(Patient.sync_batch_id == b.id)
)
responses.append(SyncBatchResponse(
id=b.id,
source_id=b.source_id,
source_name=b.source_name,
sync_mode=b.sync_mode,
created_count=b.created_count,
updated_count=b.updated_count,
total_fetched=b.total_fetched,
patient_count=count or 0,
created_at=b.created_at.isoformat(),
))
return responses
@router.delete("/batches/{batch_id}", status_code=200)
async def delete_batch(
batch_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete all patients belonging to a sync batch, then delete the batch record."""
batch = await db.get(SyncBatch, batch_id)
if not batch:
raise HTTPException(status_code=404, detail="批次记录不存在")
# Count patients before deletion for response
patient_count = await db.scalar(
select(func.count()).select_from(Patient).where(Patient.sync_batch_id == batch_id)
)
# Delete patients in this batch
await db.execute(sa_delete(Patient).where(Patient.sync_batch_id == batch_id))
# Delete batch record
await db.delete(batch)
await db.commit()
return {
"batch_id": batch_id,
"deleted_patients": patient_count or 0,
"message": f"已删除批次 {batch_id[:8]}... 的 {patient_count} 条患者数据",
}
@router.get("/list-tables") @router.get("/list-tables")
async def list_tables( async def list_tables(
connection_profile_id: str = Query(..., description="ID of the connection profile"), connection_profile_id: str = Query(..., description="ID of the connection profile"),
......
...@@ -16,3 +16,4 @@ from app.models.batch_matching import BatchMatchingJob, BatchJobStatus # noqa: ...@@ -16,3 +16,4 @@ from app.models.batch_matching import BatchMatchingJob, BatchJobStatus # noqa:
from app.models.sync_source import SyncSource # noqa: F401 from app.models.sync_source import SyncSource # noqa: F401
from app.models.data_domain_type import DataDomainType # noqa: F401 from app.models.data_domain_type import DataDomainType # noqa: F401
from app.models.connection_profile import ConnectionProfile # noqa: F401 from app.models.connection_profile import ConnectionProfile # noqa: F401
from app.models.sync_batch import SyncBatch # noqa: F401
...@@ -41,6 +41,7 @@ class Patient(Base): ...@@ -41,6 +41,7 @@ class Patient(Base):
# External system tracking # External system tracking
source_system: Mapped[str] = mapped_column(String(100), nullable=True) # "HIS", "LIS", "PACS", "manual" source_system: Mapped[str] = mapped_column(String(100), nullable=True) # "HIS", "LIS", "PACS", "manual"
external_id: Mapped[str] = mapped_column(String(255), nullable=True) # ID in the source system external_id: Mapped[str] = mapped_column(String(255), nullable=True) # ID in the source system
sync_batch_id: Mapped[str] = mapped_column(String(36), nullable=True, index=True) # batch ID of last sync
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, DateTime
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base
class SyncBatch(Base):
__tablename__ = "sync_batches"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
source_id: Mapped[str] = mapped_column(String(36))
source_name: Mapped[str] = mapped_column(String(255))
sync_mode: Mapped[str] = mapped_column(String(20), default="full")
created_count: Mapped[int] = mapped_column(Integer, default=0)
updated_count: Mapped[int] = mapped_column(Integer, default=0)
total_fetched: Mapped[int] = mapped_column(Integer, default=0)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
...@@ -6,6 +6,7 @@ from sqlalchemy import select ...@@ -6,6 +6,7 @@ from sqlalchemy import select
from app.models.patient import Patient, GenderEnum from app.models.patient import Patient, GenderEnum
from app.models.sync_source import SyncSource from app.models.sync_source import SyncSource
from app.models.connection_profile import ConnectionProfile from app.models.connection_profile import ConnectionProfile
from app.models.sync_batch import SyncBatch
from app.services.sync_adapters import get_adapter, PatientSyncData from app.services.sync_adapters import get_adapter, PatientSyncData
...@@ -122,6 +123,9 @@ async def sync_source(source_id: str, db: AsyncSession) -> dict: ...@@ -122,6 +123,9 @@ async def sync_source(source_id: str, db: AsyncSession) -> dict:
since = source.last_sync_at if (source.sync_mode == "incremental" and source.last_sync_at) else None since = source.last_sync_at if (source.sync_mode == "incremental" and source.last_sync_at) else None
patients_data: list[PatientSyncData] = await adapter.fetch_patients(since=since) patients_data: list[PatientSyncData] = await adapter.fetch_patients(since=since)
# Generate a unique batch ID for this sync run
batch_id = str(uuid.uuid4())
created = 0 created = 0
updated = 0 updated = 0
...@@ -176,6 +180,7 @@ async def sync_source(source_id: str, db: AsyncSession) -> dict: ...@@ -176,6 +180,7 @@ async def sync_source(source_id: str, db: AsyncSession) -> dict:
existing.pathology_report = pdata["pathology_report"] existing.pathology_report = pdata["pathology_report"]
existing.source_system = pdata.get("source_system", source.source_type) existing.source_system = pdata.get("source_system", source.source_type)
existing.external_id = pdata.get("external_id") existing.external_id = pdata.get("external_id")
existing.sync_batch_id = batch_id
existing.updated_at = datetime.utcnow() existing.updated_at = datetime.utcnow()
updated += 1 updated += 1
else: else:
...@@ -195,10 +200,23 @@ async def sync_source(source_id: str, db: AsyncSession) -> dict: ...@@ -195,10 +200,23 @@ async def sync_source(source_id: str, db: AsyncSession) -> dict:
fhir_medications=pdata.get("fhir_medications", []), fhir_medications=pdata.get("fhir_medications", []),
source_system=pdata.get("source_system", source.source_type), source_system=pdata.get("source_system", source.source_type),
external_id=pdata.get("external_id"), external_id=pdata.get("external_id"),
sync_batch_id=batch_id,
) )
db.add(new_patient) db.add(new_patient)
created += 1 created += 1
# Save batch record
batch = SyncBatch(
id=batch_id,
source_id=source_id,
source_name=source.name,
sync_mode=source.sync_mode or "full",
created_count=created,
updated_count=updated,
total_fetched=len(patients_data),
)
db.add(batch)
source.last_sync_at = datetime.utcnow() source.last_sync_at = datetime.utcnow()
source.last_sync_count = len(patients_data) source.last_sync_count = len(patients_data)
await db.commit() await db.commit()
...@@ -207,6 +225,7 @@ async def sync_source(source_id: str, db: AsyncSession) -> dict: ...@@ -207,6 +225,7 @@ async def sync_source(source_id: str, db: AsyncSession) -> dict:
"source_id": source_id, "source_id": source_id,
"source_name": source.name, "source_name": source.name,
"sync_mode": source.sync_mode, "sync_mode": source.sync_mode,
"batch_id": batch_id,
"patients_fetched": len(patients_data), "patients_fetched": len(patients_data),
"created": created, "created": created,
"updated": updated, "updated": updated,
......
...@@ -6,7 +6,9 @@ import { ...@@ -6,7 +6,9 @@ import {
FormControlLabel, Switch, Snackbar, Alert, CircularProgress, FormControlLabel, Switch, Snackbar, Alert, CircularProgress,
Grid, Divider, Tooltip, Stepper, Step, StepLabel, Grid, Divider, Tooltip, Stepper, Step, StepLabel,
List, ListItem, ListItemIcon, ListItemText, ListItemSecondaryAction, List, ListItem, ListItemIcon, ListItemText, ListItemSecondaryAction,
Table, TableHead, TableBody, TableRow, TableCell,
} from '@mui/material' } from '@mui/material'
import LayersIcon from '@mui/icons-material/Layers'
import SyncIcon from '@mui/icons-material/Sync' import SyncIcon from '@mui/icons-material/Sync'
import AddIcon from '@mui/icons-material/Add' import AddIcon from '@mui/icons-material/Add'
import DeleteIcon from '@mui/icons-material/Delete' import DeleteIcon from '@mui/icons-material/Delete'
...@@ -21,7 +23,7 @@ import RefreshIcon from '@mui/icons-material/Refresh' ...@@ -21,7 +23,7 @@ import RefreshIcon from '@mui/icons-material/Refresh'
import ArrowForwardIcon from '@mui/icons-material/ArrowForward' import ArrowForwardIcon from '@mui/icons-material/ArrowForward'
import TableChartIcon from '@mui/icons-material/TableChart' import TableChartIcon from '@mui/icons-material/TableChart'
import { syncService, SyncSource, SyncSourceCreate, TARGET_PATIENT_FIELDS, FieldMapping, PreviewResult } from '../services/syncService' import { syncService, SyncSource, SyncSourceCreate, SyncBatch, TARGET_PATIENT_FIELDS, FieldMapping, PreviewResult } from '../services/syncService'
import { connectionProfileService, ConnectionProfile, ConnectionProfileCreate } from '../services/connectionProfileService' import { connectionProfileService, ConnectionProfile, ConnectionProfileCreate } from '../services/connectionProfileService'
// ───────────────────────────────────────────────────────────────────────────── // ─────────────────────────────────────────────────────────────────────────────
...@@ -867,15 +869,21 @@ export function DataSyncPage() { ...@@ -867,15 +869,21 @@ export function DataSyncPage() {
const [connDialogOpen, setConnDialogOpen] = useState(false) const [connDialogOpen, setConnDialogOpen] = useState(false)
const [editProfile, setEditProfile] = useState<ConnectionProfile | null>(null) const [editProfile, setEditProfile] = useState<ConnectionProfile | null>(null)
// 批次历史
const [batches, setBatches] = useState<SyncBatch[]>([])
const [deletingBatch, setDeletingBatch] = useState<string | null>(null)
// 加载数据 // 加载数据
const loadData = useCallback(async () => { const loadData = useCallback(async () => {
try { try {
const [sourcesRes, profilesRes] = await Promise.all([ const [sourcesRes, profilesRes, batchesRes] = await Promise.all([
syncService.listSources(), syncService.listSources(),
connectionProfileService.list(), connectionProfileService.list(),
syncService.listBatches({ limit: 50 }),
]) ])
setSources(sourcesRes) setSources(sourcesRes)
setProfiles(profilesRes) setProfiles(profilesRes)
setBatches(batchesRes)
} catch (e: any) { } catch (e: any) {
showSnack('error', '加载数据失败') showSnack('error', '加载数据失败')
} finally { } finally {
...@@ -890,7 +898,7 @@ export function DataSyncPage() { ...@@ -890,7 +898,7 @@ export function DataSyncPage() {
setSyncing(s => ({ ...s, [source.id]: true })) setSyncing(s => ({ ...s, [source.id]: true }))
try { try {
const result = await syncService.runSync(source.id) const result = await syncService.runSync(source.id)
showSnack('success', `同步完成:新增 ${result.created} 条,更新 ${result.updated} `) showSnack('success', `同步完成:新增 ${result.created} 条,更新 ${result.updated} ,批次号 ${result.batch_id.slice(0, 8)}...`)
loadData() loadData()
} catch (e: any) { } catch (e: any) {
showSnack('error', e?.response?.data?.detail || '同步失败') showSnack('error', e?.response?.data?.detail || '同步失败')
...@@ -899,6 +907,21 @@ export function DataSyncPage() { ...@@ -899,6 +907,21 @@ export function DataSyncPage() {
} }
} }
// 按批次删除患者数据
const handleDeleteBatch = async (batch: SyncBatch) => {
if (!window.confirm(`确定删除批次「${batch.id.slice(0, 8)}...」的 ${batch.patient_count} 条患者数据吗?\n\n此操作不可撤销,与该批次相关的匹配记录不会被删除。`)) return
setDeletingBatch(batch.id)
try {
const result = await syncService.deleteBatch(batch.id)
showSnack('success', result.message)
loadData()
} catch (e: any) {
showSnack('error', e?.response?.data?.detail || '删除失败')
} finally {
setDeletingBatch(null)
}
}
// 删除同步任务 // 删除同步任务
const handleDeleteSource = async (source: SyncSource) => { const handleDeleteSource = async (source: SyncSource) => {
if (!window.confirm(`确定删除同步任务「${source.name}」吗?`)) return if (!window.confirm(`确定删除同步任务「${source.name}」吗?`)) return
...@@ -1126,6 +1149,99 @@ export function DataSyncPage() { ...@@ -1126,6 +1149,99 @@ export function DataSyncPage() {
</Grid> </Grid>
)} )}
{/* 同步批次历史 */}
{!loading && (
<Card sx={{ mt: 3 }}>
<CardContent sx={{ pb: 1 }}>
<Stack direction="row" alignItems="center" spacing={1}>
<LayersIcon color="action" fontSize="small" />
<Typography variant="subtitle1" fontWeight={600}>
同步批次记录
</Typography>
<Typography variant="caption" color="text.secondary">
每次同步产生一个批次号,可按批次删除该批次导入的所有患者数据
</Typography>
</Stack>
</CardContent>
<Divider />
{batches.length === 0 ? (
<Box sx={{ py: 4, textAlign: 'center' }}>
<LayersIcon sx={{ fontSize: 40, color: 'text.disabled', mb: 1 }} />
<Typography variant="body2" color="text.secondary">暂无同步记录,执行同步任务后将在此显示批次</Typography>
</Box>
) : (
<Table size="small">
<TableHead>
<TableRow sx={{ '& th': { fontWeight: 600, bgcolor: 'grey.50' } }}>
<TableCell>批次号</TableCell>
<TableCell>来源任务</TableCell>
<TableCell>同步模式</TableCell>
<TableCell>获取记录</TableCell>
<TableCell>新增</TableCell>
<TableCell>更新</TableCell>
<TableCell>当前患者数</TableCell>
<TableCell>同步时间</TableCell>
<TableCell align="right">操作</TableCell>
</TableRow>
</TableHead>
<TableBody>
{batches.map(batch => (
<TableRow key={batch.id} hover>
<TableCell>
<Tooltip title={batch.id}>
<Typography variant="caption" sx={{ fontFamily: 'monospace', cursor: 'help' }}>
{batch.id.slice(0, 8)}...
</Typography>
</Tooltip>
</TableCell>
<TableCell>
<Typography variant="body2" noWrap sx={{ maxWidth: 180 }}>{batch.source_name}</Typography>
</TableCell>
<TableCell>
<Chip
label={batch.sync_mode === 'incremental' ? '增量' : '全量'}
size="small"
color={batch.sync_mode === 'incremental' ? 'info' : 'default'}
variant="outlined"
/>
</TableCell>
<TableCell>{batch.total_fetched}</TableCell>
<TableCell>
<Typography variant="body2" color="success.main">+{batch.created_count}</Typography>
</TableCell>
<TableCell>
<Typography variant="body2" color="info.main">↑{batch.updated_count}</Typography>
</TableCell>
<TableCell>
<Chip label={batch.patient_count} size="small" color={batch.patient_count > 0 ? 'primary' : 'default'} />
</TableCell>
<TableCell>
<Typography variant="caption" color="text.secondary">
{formatDate(batch.created_at)}
</Typography>
</TableCell>
<TableCell align="right">
<Tooltip title={batch.patient_count === 0 ? '该批次患者已全部删除' : `删除该批次 ${batch.patient_count} 条患者数据`}>
<span>
<IconButton
size="small"
color="error"
disabled={batch.patient_count === 0 || deletingBatch === batch.id}
onClick={() => handleDeleteBatch(batch)}
>
{deletingBatch === batch.id ? <CircularProgress size={16} /> : <DeleteIcon fontSize="small" />}
</IconButton>
</span>
</Tooltip>
</TableCell>
</TableRow>
))}
</TableBody>
</Table>
)}
</Card>
)}
{/* 弹窗 */} {/* 弹窗 */}
<SyncTaskDialog <SyncTaskDialog
open={taskDialogOpen} open={taskDialogOpen}
......
...@@ -61,11 +61,24 @@ export interface SyncRunResult { ...@@ -61,11 +61,24 @@ export interface SyncRunResult {
source_id: string source_id: string
source_name: string source_name: string
sync_mode: string sync_mode: string
batch_id: string
patients_fetched: number patients_fetched: number
created: number created: number
updated: number updated: number
} }
export interface SyncBatch {
id: string
source_id: string
source_name: string
sync_mode: string
created_count: number
updated_count: number
total_fetched: number
patient_count: number
created_at: string
}
export interface PreviewRequest { export interface PreviewRequest {
source_type: string source_type: string
sql_query?: string sql_query?: string
...@@ -118,4 +131,10 @@ export const syncService = { ...@@ -118,4 +131,10 @@ export const syncService = {
listTables: (profileId: string) => listTables: (profileId: string) =>
api.get<{ tables: string[] }>(`/sync/list-tables?connection_profile_id=${profileId}`).then(r => r.data.tables), api.get<{ tables: string[] }>(`/sync/list-tables?connection_profile_id=${profileId}`).then(r => r.data.tables),
listBatches: (params?: { skip?: number; limit?: number }) =>
api.get<SyncBatch[]>('/sync/batches', { params }).then(r => r.data),
deleteBatch: (batchId: string) =>
api.delete<{ deleted_patients: number; message: string }>(`/sync/batches/${batchId}`).then(r => r.data),
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment