Skip to content

Commit c781f2c

Browse files
committed
docs(sync): add code comments to import_company and import_job
1 parent 48666df commit c781f2c

2 files changed

Lines changed: 51 additions & 4 deletions

File tree

apps/server/service/src/sync/import_company.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ impl CompanyImporter {
2828
) -> Result<ImportResult, ImportError> {
2929
let start_time = std::time::Instant::now();
3030

31+
// 空数据处理
3132
if data.is_empty() {
3233
return Ok(ImportResult {
3334
success: true,
@@ -47,6 +48,7 @@ impl CompanyImporter {
4748

4849
let headers = &data[0];
4950

51+
// 验证表头
5052
let (valid, version, actual_version, lack_columns, warnings) =
5153
FileParser::validate_company_headers(headers);
5254
if !valid {
@@ -78,6 +80,7 @@ impl CompanyImporter {
7880
let total = rows.len();
7981
let now = Utc::now();
8082

83+
// 使用闭包事务,自动处理提交/回滚
8184
let result = conn
8285
.transaction::<_, _, DbErr>(|txn| {
8386
let mapping = mapping.clone();
@@ -88,6 +91,7 @@ impl CompanyImporter {
8891
let mut company_source_map: HashMap<String, CompanySourceActiveModel> =
8992
HashMap::new();
9093

94+
// 根据文件的rows构建company source列表
9195
let mut errors: Vec<ImportErrorType> = Vec::new();
9296
for (row_index, row) in rows.iter().enumerate() {
9397
let company_name = get_field_value(&mapping.name, row);
@@ -126,6 +130,7 @@ impl CompanyImporter {
126130
}
127131
}
128132

133+
// 如果有解析错误,直接返回
129134
if !errors.is_empty() {
130135
return Ok(ImportResult {
131136
success: false,
@@ -143,12 +148,14 @@ impl CompanyImporter {
143148
});
144149
}
145150

151+
// 根据company_source id获取已存在的数据
146152
let exists_company_source = Self::batch_query_company_sources(
147153
company_ids_from_data.into_iter().collect(),
148154
txn,
149155
)
150156
.await?;
151157

158+
// 过滤数据库中已存在的company_source记录
152159
let exists_company_source_ids: Vec<String> = exists_company_source
153160
.iter()
154161
.map(|item| item.id.clone())
@@ -160,8 +167,10 @@ impl CompanyImporter {
160167
let filter_company_source: Vec<entity::company_source::ActiveModel> =
161168
company_source_map.into_values().collect();
162169

170+
// 保存过滤后的company_source记录
163171
Self::batch_insert_company_sources(filter_company_source.clone(), txn).await?;
164172

173+
// 构建 company_id 到 company_source 的映射
165174
let mut filter_company_id_and_source_map = HashMap::new();
166175
let mut company_ids = Vec::new();
167176
for item in filter_company_source {
@@ -178,13 +187,15 @@ impl CompanyImporter {
178187
company_ids.push(company_id);
179188
}
180189

190+
// 查询已存在的company记录
181191
let exists_company = Self::batch_query_companies(company_ids, txn).await?;
182192

183193
let exists_company_map: HashMap<&str, &entity::company::Model> = exists_company
184194
.iter()
185195
.map(|item| (item.id.as_str(), item))
186196
.collect();
187197

198+
// 区分已存在和不存在company的记录
188199
let mut exists_company_source = Vec::new();
189200
let mut not_exists_company_source = Vec::new();
190201
for (_, company_id) in filter_company_id_and_source_map.values() {
@@ -197,6 +208,7 @@ impl CompanyImporter {
197208

198209
let mut insert_company = Vec::new();
199210

211+
// 如果company不存在,则进行插入逻辑
200212
for company_id in not_exists_company_source {
201213
if let Some((source_model, _)) =
202214
filter_company_id_and_source_map.get(&company_id)
@@ -212,6 +224,7 @@ impl CompanyImporter {
212224

213225
let mut update_company_list = Vec::new();
214226

227+
// 如果company已存在,则进行更新处理逻辑
215228
for company_id in exists_company_source {
216229
let (source_model, _) = filter_company_id_and_source_map
217230
.get(&company_id)
@@ -234,6 +247,7 @@ impl CompanyImporter {
234247
|e| DbErr::Query(sea_orm::RuntimeErr::Internal(e.to_string())),
235248
)?;
236249

250+
// 规则1: source_refresh_datetime更新时,从company_source重建company
237251
let existing_refresh = existing_company.source_refresh_datetime;
238252
let new_refresh = source_model.source_refresh_datetime;
239253
if let (Some(existing_dt), Some(new_dt)) = (existing_refresh, new_refresh)
@@ -252,8 +266,10 @@ impl CompanyImporter {
252266
let imported = insert_company.len();
253267
let updated = update_company_list.len();
254268

269+
// 批量插入新company
255270
Self::batch_insert_companies(insert_company, txn).await?;
256271

272+
// 更新已有company
257273
for item in update_company_list {
258274
item.update(txn).await?;
259275
}
@@ -283,6 +299,7 @@ impl CompanyImporter {
283299
Ok(result)
284300
}
285301

302+
// 根据company_source id批量查询
286303
async fn batch_query_company_sources<C>(
287304
ids: Vec<String>,
288305
conn: &C,
@@ -301,6 +318,7 @@ impl CompanyImporter {
301318
Ok(results)
302319
}
303320

321+
// 批量插入company_source记录
304322
async fn batch_insert_company_sources<C>(
305323
sources: Vec<entity::company_source::ActiveModel>,
306324
conn: &C,
@@ -316,6 +334,7 @@ impl CompanyImporter {
316334
Ok(())
317335
}
318336

337+
// 根据company id批量查询
319338
async fn batch_query_companies<C>(
320339
ids: Vec<String>,
321340
conn: &C,
@@ -334,6 +353,7 @@ impl CompanyImporter {
334353
Ok(results)
335354
}
336355

356+
// 批量插入company记录
337357
async fn batch_insert_companies<C>(
338358
companies: Vec<entity::company::ActiveModel>,
339359
conn: &C,
@@ -349,6 +369,7 @@ impl CompanyImporter {
349369
Ok(())
350370
}
351371

372+
// 根据CSV行构建company_source模型
352373
fn build_company_source(
353374
id: &str,
354375
company_id: &str,
@@ -408,6 +429,7 @@ impl CompanyImporter {
408429
update_datetime: ActiveValue::set(None),
409430
};
410431

432+
// 解析 source_refresh_datetime
411433
if let Some(dt_str) = get_string(row, mapping.source_refresh_datetime) {
412434
if let Ok(dt) = DateTime::parse_from_rfc3339(&dt_str) {
413435
model.source_refresh_datetime =
@@ -417,6 +439,7 @@ impl CompanyImporter {
417439
}
418440
}
419441

442+
// 解析 create_datetime
420443
if let Some(dt_str) = get_string(row, mapping.create_datetime)
421444
&& let Ok(dt) = DateTime::parse_from_rfc3339(&dt_str)
422445
{
@@ -433,6 +456,7 @@ impl CompanyImporter {
433456
Ok(model)
434457
}
435458

459+
// 根据company_source构建company模型
436460
fn build_company(
437461
source: &CompanySourceModel,
438462
update_datetime: &DateTime<FixedOffset>,
@@ -476,5 +500,4 @@ impl CompanyImporter {
476500
update_datetime: ActiveValue::set(Some(*update_datetime)),
477501
})
478502
}
479-
}
480-
503+
}

apps/server/service/src/sync/import_job.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ impl JobImporter {
2626
) -> Result<ImportResult, ImportError> {
2727
let start_time = std::time::Instant::now();
2828

29+
// 空数据处理
2930
if data.is_empty() {
3031
return Ok(ImportResult {
3132
success: true,
@@ -45,6 +46,7 @@ impl JobImporter {
4546

4647
let headers = &data[0];
4748

49+
// 验证表头
4850
let (valid, version, actual_version, lack_columns, warnings) =
4951
FileParser::validate_job_headers(headers);
5052
if !valid {
@@ -76,6 +78,7 @@ impl JobImporter {
7678
let total = rows.len();
7779
let now = Utc::now();
7880

81+
// 使用闭包事务,自动处理提交/回滚
7982
let result = conn
8083
.transaction::<_, _, DbErr>(|txn| {
8184
let mapping = mapping.clone();
@@ -85,6 +88,7 @@ impl JobImporter {
8588
let mut job_ids_from_data: HashSet<String> = HashSet::new();
8689
let mut job_source_map: HashMap<String, JobSourceActiveModel> = HashMap::new();
8790

91+
// 根据文件的rows构建job source列表
8892
let mut errors: Vec<ImportErrorType> = Vec::new();
8993
for (row_index, row) in rows.iter().enumerate() {
9094
let job_id = get_field_value(&mapping.job_id, row);
@@ -117,6 +121,7 @@ impl JobImporter {
117121
}
118122
}
119123

124+
// 如果有解析错误,直接返回
120125
if !errors.is_empty() {
121126
return Ok(ImportResult {
122127
success: false,
@@ -134,10 +139,12 @@ impl JobImporter {
134139
});
135140
}
136141

142+
// 根据data job_source id获取已存在数据中的job_source记录
137143
let exists_job_source =
138144
Self::batch_query_job_sources(job_ids_from_data.into_iter().collect(), txn)
139145
.await?;
140146

147+
// 过滤数据库中的job_source记录
141148
let exists_job_source_ids: Vec<String> = exists_job_source
142149
.iter()
143150
.map(|item| item.id.clone())
@@ -149,11 +156,13 @@ impl JobImporter {
149156
let filter_job_source: Vec<entity::job_source::ActiveModel> =
150157
job_source_map.into_values().collect();
151158

159+
// 保存过滤后的job_source记录
152160
Self::batch_insert_job_sources(filter_job_source.clone(), txn).await?;
153161

154162
let mut filter_job_id_and_job_source_map = HashMap::new();
155163
let mut job_ids = Vec::new();
156164

165+
// 根据过滤后的job source更新或插入对应的job记录
157166
for item in filter_job_source {
158167
let model = item.try_into_model().map_err(|e| {
159168
DbErr::Query(sea_orm::RuntimeErr::Internal(e.to_string()))
@@ -167,13 +176,15 @@ impl JobImporter {
167176
job_ids.push(job_id);
168177
}
169178

179+
// 查询job记录(带锁)
170180
let exists_job = Self::batch_query_jobs(job_ids, txn).await?;
171181

172182
let exists_job_id_model_map: HashMap<&str, &entity::job::Model> = exists_job
173183
.iter()
174184
.map(|item| (item.id.as_str(), item))
175185
.collect();
176186

187+
// 区分已存在和不存在job的记录
177188
let mut exists_job_source = Vec::new();
178189
let mut not_exists_job_source = Vec::new();
179190
for (id, job_source) in &filter_job_id_and_job_source_map {
@@ -186,6 +197,7 @@ impl JobImporter {
186197

187198
let mut insert_job = Vec::new();
188199

200+
// 如果job不存在,则进行插入逻辑
189201
for item in not_exists_job_source {
190202
let now_fixed = now.fixed_offset();
191203
let job = Self::build_job(item, &now_fixed, &now_fixed).map_err(|e| {
@@ -196,6 +208,7 @@ impl JobImporter {
196208

197209
let mut update_job_list = Vec::new();
198210

211+
// 如果job已存在,则进行更新处理逻辑
199212
for item in exists_job_source {
200213
let id = item.job_id.clone().ok_or_else(|| {
201214
DbErr::Query(sea_orm::RuntimeErr::Internal(
@@ -213,6 +226,7 @@ impl JobImporter {
213226
DbErr::Query(sea_orm::RuntimeErr::Internal(e.to_string()))
214227
})?;
215228

229+
// 规则1: 更新的数据发布时间,如果job source的发布时间更新,则重建job
216230
if update_job.publish_datetime.ok_or_else(|| {
217231
DbErr::Query(sea_orm::RuntimeErr::Internal(
218232
"job publish_datetime is empty".to_string(),
@@ -241,6 +255,7 @@ impl JobImporter {
241255
})?;
242256
}
243257

258+
// 规则2: 获得公司全称,如果原job的公司名称不是全称,而job source的是全称,那么更新
244259
if !job.is_full_company_name.ok_or_else(|| {
245260
DbErr::Query(sea_orm::RuntimeErr::Internal(
246261
"job is_full_company_name is empty".to_string(),
@@ -254,6 +269,8 @@ impl JobImporter {
254269
update_job.company_name = job_source.company_name.clone();
255270
}
256271

272+
// 规则3: 更早的首次扫描时间,如果原job source的首次扫描时间比job的更早,
273+
// 那么更新job source的首次扫描时间到job
257274
if job_source.first_scan_datetime.ok_or_else(|| {
258275
DbErr::Query(sea_orm::RuntimeErr::Internal(
259276
"job_source first_scan_datetime is empty".to_string(),
@@ -275,8 +292,10 @@ impl JobImporter {
275292
let imported = insert_job.len();
276293
let updated = update_job_list.len();
277294

295+
// 批量插入新job
278296
Self::batch_insert_jobs(insert_job, txn).await?;
279297

298+
// 更新已有job
280299
for item in update_job_list {
281300
item.update(txn).await?;
282301
}
@@ -306,6 +325,7 @@ impl JobImporter {
306325
Ok(result)
307326
}
308327

328+
// 根据job_source id批量查询
309329
async fn batch_query_job_sources<C>(
310330
ids: Vec<String>,
311331
conn: &C,
@@ -324,6 +344,7 @@ impl JobImporter {
324344
Ok(results)
325345
}
326346

347+
// 批量插入job_source记录
327348
async fn batch_insert_job_sources<C>(
328349
sources: Vec<entity::job_source::ActiveModel>,
329350
conn: &C,
@@ -339,6 +360,7 @@ impl JobImporter {
339360
Ok(())
340361
}
341362

363+
// 根据job id批量查询(带锁)
342364
async fn batch_query_jobs<C>(
343365
ids: Vec<String>,
344366
conn: &C,
@@ -358,6 +380,7 @@ impl JobImporter {
358380
Ok(results)
359381
}
360382

383+
// 批量插入job记录
361384
async fn batch_insert_jobs<C>(
362385
jobs: Vec<entity::job::ActiveModel>,
363386
conn: &C,
@@ -373,6 +396,7 @@ impl JobImporter {
373396
Ok(())
374397
}
375398

399+
// 根据CSV行构建job_source模型
376400
fn build_job_source(
377401
id: &str,
378402
job_id: &str,
@@ -439,6 +463,7 @@ impl JobImporter {
439463
})
440464
}
441465

466+
// 根据job_source构建job模型
442467
fn build_job(
443468
source: &JobSourceModel,
444469
update_datetime: &DateTime<FixedOffset>,
@@ -479,5 +504,4 @@ impl JobImporter {
479504
publish_datetime: ActiveValue::set(source.publish_datetime),
480505
})
481506
}
482-
}
483-
507+
}

0 commit comments

Comments
 (0)