|
|
|
@@ -1,36 +1,36 @@ |
|
|
|
package com.ruoyi.platform.service.impl; |
|
|
|
|
|
|
|
import cn.hutool.core.collection.CollectionUtil; |
|
|
|
import cn.hutool.core.text.csv.CsvData; |
|
|
|
import cn.hutool.core.text.csv.CsvReader; |
|
|
|
import cn.hutool.core.text.csv.CsvUtil; |
|
|
|
import cn.hutool.core.date.DateUtil; |
|
|
|
import cn.hutool.core.util.IdUtil; |
|
|
|
import cn.hutool.core.util.ObjectUtil; |
|
|
|
import cn.hutool.json.JSONArray; |
|
|
|
import cn.hutool.json.JSONUtil; |
|
|
|
import com.ruoyi.common.core.enums.KgStatus; |
|
|
|
import com.ruoyi.common.core.enums.KgUpdateMethod; |
|
|
|
import com.ruoyi.platform.domain.kg.KnowledgeGraphVersion; |
|
|
|
import com.ruoyi.platform.domain.kg.dto.KgInfoIdDTO; |
|
|
|
import com.ruoyi.platform.domain.kg.dto.KgVersionInsertDTO; |
|
|
|
import com.ruoyi.platform.mapper.kg.KnowledgeGraphVersionDao; |
|
|
|
import com.ruoyi.platform.service.KgVersionService; |
|
|
|
import com.ruoyi.platform.service.MinioService; |
|
|
|
import com.ruoyi.platform.service.kg.FriendsWith; |
|
|
|
import com.ruoyi.platform.service.kg.Person; |
|
|
|
import com.ruoyi.platform.service.kg.PersonRepository; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
import org.springframework.core.io.InputStreamResource; |
|
|
|
import org.springframework.http.ResponseEntity; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
import org.springframework.transaction.annotation.Transactional; |
|
|
|
import org.springframework.web.multipart.MultipartFile; |
|
|
|
|
|
|
|
import java.io.InputStreamReader; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.HashMap; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.io.InputStream; |
|
|
|
import java.util.*; |
|
|
|
import java.util.zip.CRC32; |
|
|
|
|
|
|
|
import com.ruoyi.platform.domain.CsvParser; |
|
|
|
|
|
|
|
@Service |
|
|
|
public class KgVersionServiceImpl implements KgVersionService { |
|
|
|
|
|
|
|
@@ -40,6 +40,12 @@ public class KgVersionServiceImpl implements KgVersionService { |
|
|
|
@Autowired |
|
|
|
private PersonRepository personRepository; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private MinioService minioService; |
|
|
|
|
|
|
|
@Value("${minio.bucketName}") |
|
|
|
public String bucketName; |
|
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(KgVersionServiceImpl.class); |
|
|
|
|
|
|
|
@Override |
|
|
|
@@ -48,74 +54,42 @@ public class KgVersionServiceImpl implements KgVersionService { |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void uploadCsv(MultipartFile file, String kgId) { |
|
|
|
try (InputStreamReader reader = new InputStreamReader(file.getInputStream())) { |
|
|
|
// 创建 CsvReader |
|
|
|
CsvReader csvReader = CsvUtil.getReader(); |
|
|
|
// 读取所有行 |
|
|
|
CsvData rows = csvReader.read(reader); |
|
|
|
|
|
|
|
KnowledgeGraphVersion knowledgeGraphVersion = new KnowledgeGraphVersion(); |
|
|
|
knowledgeGraphVersion.setKgId(kgId); |
|
|
|
knowledgeGraphVersion.setVersion(maxVersion(kgId)); |
|
|
|
knowledgeGraphVersion.setName(file.getOriginalFilename()); |
|
|
|
knowledgeGraphVersion.setTransactionId(IdUtil.simpleUUID()); |
|
|
|
knowledgeGraphVersion.setStatus(KgStatus.OFFLINE.getCode()); |
|
|
|
knowledgeGraphVersion.setContent(JSONUtil.toJsonStr(rows.getRows())); |
|
|
|
knowledgeGraphVersion.setUpdateMethod(KgUpdateMethod.PENDING_UPDATE.getCode()); |
|
|
|
knowledgeGraphVersionDao.insert(knowledgeGraphVersion); |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("e={}", e.getMessage()); |
|
|
|
} |
|
|
|
public void insert(KgVersionInsertDTO kgVersionInsertDTO) { |
|
|
|
String path = kgVersionInsertDTO.getPath(); |
|
|
|
String kgId = kgVersionInsertDTO.getKgId(); |
|
|
|
KnowledgeGraphVersion knowledgeGraphVersion = new KnowledgeGraphVersion(); |
|
|
|
knowledgeGraphVersion.setKgId(kgId); |
|
|
|
knowledgeGraphVersion.setVersion(maxVersion(kgId)); |
|
|
|
knowledgeGraphVersion.setName(path.replaceAll(".*/([^/]+)$", "$1")); |
|
|
|
knowledgeGraphVersion.setTransactionId(IdUtil.simpleUUID()); |
|
|
|
knowledgeGraphVersion.setStatus(KgStatus.OFFLINE.getCode()); |
|
|
|
knowledgeGraphVersion.setContent(path); |
|
|
|
knowledgeGraphVersion.setUpdateMethod(KgUpdateMethod.PENDING_UPDATE.getCode()); |
|
|
|
knowledgeGraphVersionDao.insert(knowledgeGraphVersion); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public String uploadCsv(MultipartFile file) throws Exception { |
|
|
|
String path = "/knowledge-graph/" + DateUtil.now() + "/" + file.getOriginalFilename(); |
|
|
|
minioService.uploadFile(bucketName, path, file); |
|
|
|
return path; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@Transactional |
|
|
|
public void fullUpdate(Long id) { |
|
|
|
KnowledgeGraphVersion knowledgeGraphVersion = knowledgeGraphVersionDao.queryById(id); |
|
|
|
String transactionId = knowledgeGraphVersion.getTransactionId(); |
|
|
|
Integer version = knowledgeGraphVersion.getVersion(); |
|
|
|
JSONArray neo4jContents = JSONUtil.parseArray(knowledgeGraphVersion.getContent()); |
|
|
|
if (CollectionUtil.isEmpty(neo4jContents)||neo4jContents.size()<=1){ |
|
|
|
return; |
|
|
|
KnowledgeGraphVersion currentVersion = knowledgeGraphVersionDao.queryById(id); |
|
|
|
String transactionId = currentVersion.getTransactionId(); |
|
|
|
Integer version = currentVersion.getVersion(); |
|
|
|
KnowledgeGraphVersion onlineVersion = knowledgeGraphVersionDao.onlineFind(); |
|
|
|
if (ObjectUtil.isNotNull(onlineVersion)) { |
|
|
|
//旧版在线变离线 |
|
|
|
changeStatus(onlineVersion.getId(), KgStatus.OFFLINE.getCode(), null); |
|
|
|
} |
|
|
|
//去掉表头 |
|
|
|
neo4jContents.remove(0); |
|
|
|
// 用于存储已解析的 Person 节点 |
|
|
|
Map<String, Person> personMap = new HashMap<>(); |
|
|
|
neo4jContents.forEach(item -> { |
|
|
|
JSONArray lineData = JSONUtil.parseArray(item); |
|
|
|
String name = String.valueOf(lineData.get(0)); |
|
|
|
String age = String.valueOf((lineData.get(1))); |
|
|
|
String relationType = String.valueOf(lineData.get(2)); |
|
|
|
String relatedPerson = String.valueOf(lineData.get(3)); |
|
|
|
String since = lineData.get(4) != null ? String.valueOf(lineData.get(4)) : "0"; |
|
|
|
|
|
|
|
// 创建或获取 Person 节点 |
|
|
|
Person person = personMap.computeIfAbsent(name, k -> new Person(name, age, new ArrayList<>())); |
|
|
|
person.setVersion(version); |
|
|
|
person.setTransactionId(transactionId); |
|
|
|
|
|
|
|
// 如果存在关系,则创建关系 |
|
|
|
if (relationType != null && relatedPerson != null) { |
|
|
|
Person related = personMap.computeIfAbsent(relatedPerson, k -> new Person(relatedPerson, "0", new ArrayList<>())); |
|
|
|
FriendsWith relation = new FriendsWith(related, since); |
|
|
|
related.setTransactionId(transactionId); |
|
|
|
related.setVersion(version); |
|
|
|
relation.setTransactionId(transactionId); |
|
|
|
relation.setVersion(version); |
|
|
|
relation.setRelationType(relationType); // 动态设置关系类型 |
|
|
|
person.getRelations().add(relation); |
|
|
|
} |
|
|
|
// 保存 Person 节点 |
|
|
|
personRepository.save(person); |
|
|
|
}); |
|
|
|
|
|
|
|
KnowledgeGraphVersion online = new KnowledgeGraphVersion(); |
|
|
|
online.setStatus(KgStatus.ONLINE.getCode()); |
|
|
|
online.setUpdateMethod(KgUpdateMethod.INCREMENTAL_UPDATE.getCode()); |
|
|
|
online.setId(id); |
|
|
|
knowledgeGraphVersionDao.update(online); |
|
|
|
//当前版本变在线 |
|
|
|
changeStatus(currentVersion.getId(), KgStatus.ONLINE.getCode(), KgUpdateMethod.FULL_UPDATE.getCode()); |
|
|
|
//解析表格数据保存到neo4j |
|
|
|
saveToNeo4j(currentVersion.getContent(), version, transactionId); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@@ -123,12 +97,12 @@ public class KgVersionServiceImpl implements KgVersionService { |
|
|
|
public void incrementalUpdate(Long id) { |
|
|
|
//当前版本 |
|
|
|
KnowledgeGraphVersion currentVersion = knowledgeGraphVersionDao.queryById(id); |
|
|
|
String transactionId = currentVersion.getTransactionId(); |
|
|
|
Integer version = currentVersion.getVersion(); |
|
|
|
String transactionId = currentVersion.getTransactionId(); |
|
|
|
// 找出在线的版本 |
|
|
|
KnowledgeGraphVersion onlineVersion = knowledgeGraphVersionDao.onlineFind(); |
|
|
|
if (ObjectUtil.isNotNull(onlineVersion)&&ObjectUtil.isNotEmpty(onlineVersion.getTransactionId()) |
|
|
|
&&ObjectUtil.isNotNull(onlineVersion.getVersion())) { |
|
|
|
if (ObjectUtil.isNotNull(onlineVersion) && ObjectUtil.isNotEmpty(onlineVersion.getTransactionId()) |
|
|
|
&& ObjectUtil.isNotNull(onlineVersion.getVersion())) { |
|
|
|
// 存在在线的版本,则取在线版本的version和transactionId |
|
|
|
transactionId = onlineVersion.getTransactionId(); |
|
|
|
version = onlineVersion.getVersion(); |
|
|
|
@@ -138,9 +112,7 @@ public class KgVersionServiceImpl implements KgVersionService { |
|
|
|
knowledgeGraphVersionDao.update(currentVersion); |
|
|
|
}else { |
|
|
|
//当前版本为在线版本 |
|
|
|
currentVersion.setUpdateMethod(KgUpdateMethod.INCREMENTAL_UPDATE.getCode()); |
|
|
|
currentVersion.setStatus(KgStatus.ONLINE.getCode()); |
|
|
|
knowledgeGraphVersionDao.update(currentVersion); |
|
|
|
changeStatus(currentVersion.getId(), KgStatus.ONLINE.getCode(), KgUpdateMethod.INCREMENTAL_UPDATE.getCode()); |
|
|
|
} |
|
|
|
//解析表格数据保存到neo4j |
|
|
|
saveToNeo4j(currentVersion.getContent(), version, transactionId); |
|
|
|
@@ -150,49 +122,76 @@ public class KgVersionServiceImpl implements KgVersionService { |
|
|
|
@Transactional |
|
|
|
public void rollback(Long versionId) { |
|
|
|
//在线的版本置为离线 |
|
|
|
KnowledgeGraphVersion knowledgeGraphVersion = knowledgeGraphVersionDao.onlineFind(); |
|
|
|
knowledgeGraphVersion.setStatus(KgStatus.OFFLINE.getCode()); |
|
|
|
knowledgeGraphVersionDao.update(knowledgeGraphVersion); |
|
|
|
KnowledgeGraphVersion onlineVersion = knowledgeGraphVersionDao.onlineFind(); |
|
|
|
changeStatus(onlineVersion.getId(), KgStatus.OFFLINE.getCode(),null); |
|
|
|
//该版本置为在线 |
|
|
|
KnowledgeGraphVersion onlineVersion = new KnowledgeGraphVersion(); |
|
|
|
onlineVersion.setId(versionId); |
|
|
|
onlineVersion.setStatus(KgStatus.ONLINE.getCode()); |
|
|
|
changeStatus(versionId,KgStatus.ONLINE.getCode(),null); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@Transactional |
|
|
|
public void delete(Long versionId) { |
|
|
|
knowledgeGraphVersionDao.deleteById(versionId); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 状态切换 |
|
|
|
* |
|
|
|
* @param id |
|
|
|
* @param status |
|
|
|
* @param updateMethod |
|
|
|
*/ |
|
|
|
private void changeStatus(Long id, Integer status, Integer updateMethod) { |
|
|
|
KnowledgeGraphVersion knowledgeGraphVersion = new KnowledgeGraphVersion(); |
|
|
|
knowledgeGraphVersion.setId(id); |
|
|
|
knowledgeGraphVersion.setStatus(status); |
|
|
|
knowledgeGraphVersion.setUpdateMethod(updateMethod); |
|
|
|
knowledgeGraphVersionDao.update(knowledgeGraphVersion); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 从minio中获取数据 |
|
|
|
* |
|
|
|
* @param path |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
private List<Map<String, String>> queryCsvDataFromMinio(String path) { |
|
|
|
List<Map<String, String>> csvData; |
|
|
|
try { |
|
|
|
ResponseEntity<InputStreamResource> inputStreamResourceResponseEntity = minioService.downloadFile(bucketName, path); |
|
|
|
InputStreamResource body = inputStreamResourceResponseEntity.getBody(); |
|
|
|
InputStream inputStream = body.getInputStream(); |
|
|
|
csvData = CsvParser.parseCsvToMap(inputStream); |
|
|
|
} catch (Exception e) { |
|
|
|
csvData = new ArrayList<>(); |
|
|
|
} |
|
|
|
return csvData; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 解析表格数据保存到neo4j |
|
|
|
*/ |
|
|
|
private void saveToNeo4j(String content,int version,String transactionId) { |
|
|
|
JSONArray neo4jContents = JSONUtil.parseArray(content); |
|
|
|
if (CollectionUtil.isEmpty(neo4jContents)||neo4jContents.size()<=1){ |
|
|
|
private void saveToNeo4j(String content, int version, String transactionId) { |
|
|
|
List<Map<String, String>> csvData = queryCsvDataFromMinio(content); |
|
|
|
if (CollectionUtil.isEmpty(csvData)) { |
|
|
|
return; |
|
|
|
} |
|
|
|
//去掉表头 |
|
|
|
neo4jContents.remove(0); |
|
|
|
Map<String, Person> personMap = new HashMap<>(); |
|
|
|
// 把当前内容合并到在线版本 |
|
|
|
neo4jContents.forEach(item -> { |
|
|
|
JSONArray lineData = JSONUtil.parseArray(item); |
|
|
|
String name = String.valueOf(lineData.get(0)); |
|
|
|
String age = String.valueOf((lineData.get(1))); |
|
|
|
String relationType = String.valueOf(lineData.get(2)); |
|
|
|
String relatedPerson = String.valueOf(lineData.get(3)); |
|
|
|
String since = lineData.get(4) != null ? String.valueOf(lineData.get(4)) : "0"; |
|
|
|
csvData.forEach(item -> { |
|
|
|
String name = item.get("name"); |
|
|
|
String age = item.get("age"); |
|
|
|
String relationType = item.get("relation_type"); |
|
|
|
String relatedPerson = item.get("friends_with"); |
|
|
|
String since = item.get("since"); |
|
|
|
|
|
|
|
// 创建或获取 Person 节点 |
|
|
|
Person person = personMap.computeIfAbsent(name, k -> new Person(name, age, new ArrayList<>())); |
|
|
|
person.setVersion(version); |
|
|
|
person.setTransactionId(transactionId); |
|
|
|
Person person = personMap.computeIfAbsent(name, k -> new Person(name, age, new ArrayList<>(), version, transactionId)); |
|
|
|
|
|
|
|
// 如果存在关系,则创建关系 |
|
|
|
if (relationType != null && relatedPerson != null) { |
|
|
|
Person related = personMap.computeIfAbsent(relatedPerson, k -> new Person(relatedPerson, "0", new ArrayList<>())); |
|
|
|
FriendsWith relation = new FriendsWith(related, since); |
|
|
|
related.setTransactionId(transactionId); |
|
|
|
related.setVersion(version); |
|
|
|
relation.setTransactionId(transactionId); |
|
|
|
relation.setVersion(version); |
|
|
|
Person related = personMap.computeIfAbsent(relatedPerson, k -> new Person(relatedPerson, "0", new ArrayList<>(), version, transactionId)); |
|
|
|
FriendsWith relation = new FriendsWith(related, since, version, transactionId); |
|
|
|
relation.setRelationType(relationType); // 动态设置关系类型 |
|
|
|
person.getRelations().add(relation); |
|
|
|
} |
|
|
|
@@ -215,6 +214,7 @@ public class KgVersionServiceImpl implements KgVersionService { |
|
|
|
|
|
|
|
/** |
|
|
|
* 下一个升级的版本 |
|
|
|
* |
|
|
|
* @param kgId |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
|