You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

mysql.go 12 kB


  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. _ "github.com/go-sql-driver/mysql"
  6. "github.com/jmoiron/sqlx"
  7. "gitlink.org.cn/cloudream/utils/consts"
  8. )
  9. // 数据库指针
  10. var db *sqlx.DB
  11. // 错误处理函数
  12. func HandleError(why string, err error) {
  13. if err != nil {
  14. fmt.Println(why, err)
  15. }
  16. }
  17. // 初始化数据库连接,init()方法系统会在动在main方法之前执行。
  18. func init() {
  19. database, err := sqlx.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/kx?charset=utf8mb4&parseTime=true")
  20. HandleError("open mysql failed,", err)
  21. db = database
  22. }
  23. // 节点延迟表插入
  24. func Insert_NodeDelay(innodeIP string, outnodeIP []string, delay []int) {
  25. insSql := "insert into NodeDelay values(?,?,?)"
  26. updateSql := "UPDATE NodeDelay SET DelayInMs=? WHERE InNodeIP=? AND OutNodeIP=?"
  27. for i := 0; i < len(outnodeIP); i++ {
  28. _, err := db.Exec(insSql, innodeIP, outnodeIP[i], delay[i])
  29. //HandleError("insert failed: ", err)
  30. if err != nil {
  31. _, e := db.Exec(updateSql, delay[i], innodeIP, outnodeIP[i])
  32. HandleError("update failed: ", e)
  33. }
  34. }
  35. }
  36. // 节点表插入
  37. func Insert_Node(nodeip string, nodelocation string, ipfsstatus string, localdirstatus string) {
  38. // 根据NodeIP查询,若不存在则插入,若存在则更新
  39. //查询
  40. type Node struct {
  41. NodeIP string `db:"NodeIP"`
  42. }
  43. var x Node
  44. sql := "select NodeIP from Node where NodeIP=?"
  45. err := db.Get(&x, sql, nodeip)
  46. HandleError("Get failed: ", err)
  47. //local和ipfs同时可达才可达
  48. // TODO 将status字段改成字符串(枚举值)
  49. NodeStatus := ipfsstatus == consts.IPFS_STATUS_OK && localdirstatus == consts.LOCAL_DIR_STATUS_OK
  50. //不存在才插入
  51. if x == (Node{}) {
  52. sql := "insert into Node values(?,?,?)"
  53. _, err := db.Exec(sql, nodeip, nodelocation, NodeStatus)
  54. HandleError("insert failed: ", err)
  55. } else {
  56. //存在则更新
  57. sql := "update Node set NodeStatus=? where NodeIP=?"
  58. _, err := db.Exec(sql, NodeStatus, nodeip)
  59. HandleError("update failed: ", err)
  60. }
  61. }
  62. // 纠删码对象表插入
  63. func Insert_EcObject(Object_Name string, Bucket_ID int, FileSizeInBytes int64, EcName string) (objectid int64) {
  64. // 根据objectname和bucketid查询,若不存在则插入,若存在则不操作
  65. //查询
  66. type Object struct {
  67. ObjectID int64 `db:"ObjectID"`
  68. ObjectName string `db:"ObjectName"`
  69. BucketID int `db:"BucketID"`
  70. }
  71. var x Object
  72. sql := "select ObjectID, ObjectName, BucketID from Object where ObjectName=? AND BucketID=?"
  73. err := db.Get(&x, sql, Object_Name, Bucket_ID)
  74. HandleError("Get failed: ", err)
  75. //不存在才插入
  76. if x == (Object{}) {
  77. sql := "insert into Object(ObjectName, BucketID, FileSizeInBytes, Redundancy, NumRep, EcName) values(?,?,?,?,?,?)"
  78. r, err := db.Exec(sql, Object_Name, Bucket_ID, FileSizeInBytes, false, "-1", EcName)
  79. HandleError("insert failed: ", err)
  80. id, err := r.LastInsertId()
  81. HandleError("exec failed: ", err)
  82. objectid = id
  83. } else {
  84. objectid = x.ObjectID
  85. }
  86. return
  87. }
  88. // 多副本对象表插入
  89. func Insert_RepObject(Object_Name string, Bucket_ID int, FileSizeInBytes int64, NumRep int) (objectid int64) {
  90. // 根据objectname和bucketid查询,若不存在则插入,若存在则不操作
  91. //查询
  92. type Object struct {
  93. ObjectID int64 `db:"ObjectID"`
  94. ObjectName string `db:"ObjectName"`
  95. BucketID int `db:"BucketID"`
  96. }
  97. var x Object
  98. sql := "select ObjectID, ObjectName, BucketID from Object where ObjectName=? AND BucketID=?"
  99. err := db.Get(&x, sql, Object_Name, Bucket_ID)
  100. HandleError("Get failed: ", err)
  101. //不存在才插入
  102. if x == (Object{}) {
  103. sql := "insert into Object(ObjectName, BucketID, FileSizeInBytes, Redundancy, NumRep) values(?,?,?,?,?)"
  104. r, err := db.Exec(sql, Object_Name, Bucket_ID, FileSizeInBytes, true, NumRep)
  105. HandleError("insert failed: ", err)
  106. id, err := r.LastInsertId()
  107. HandleError("exec failed: ", err)
  108. objectid = id
  109. } else {
  110. objectid = x.ObjectID
  111. }
  112. return
  113. }
  114. // 对象编码块表插入
  115. func Insert_EcObjectBlock(Object_ID int64, Inner_ID int) {
  116. // 根据objectID查询,若不存在则插入,若存在则不操作
  117. //查询
  118. type Block struct {
  119. ObjectID int64 `db:"ObjectID"`
  120. }
  121. var x []Block
  122. sql := "select ObjectID from ObjectBlock where ObjectID=? AND InnerID=?"
  123. err := db.Select(&x, sql, Object_ID, Inner_ID)
  124. HandleError("select failed: ", err)
  125. //不存在才插入
  126. if x == nil {
  127. sql := "insert into ObjectBlock(ObjectID, InnerID) values(?,?)"
  128. //执行SQL语句
  129. _, err := db.Exec(sql, Object_ID, Inner_ID)
  130. HandleError("insert failed: ", err)
  131. //查询最后一条用户ID,判断是否插入成功
  132. // id, err := r.LastInsertId()
  133. // HandleError("exec failed: ", err)
  134. // fmt.Println("insert EcObjectBlock succ: ", id)
  135. }
  136. }
  137. // 对象副本表插入
  138. func Insert_ObjectRep(Object_ID int64) {
  139. sql := "insert into ObjectRep(ObjectID) values(?)"
  140. _, err := db.Exec(sql, Object_ID)
  141. HandleError("insert failed: ", err)
  142. }
  143. // 对象编码块表Echash插入
  144. func Insert_EcHash(Object_ID int, Hashs []string) {
  145. for i := 0; i < len(Hashs); i++ {
  146. sql := "update ObjectBlock set BlockHash =? where ObjectID = ? AND InnerID = ?"
  147. _, err := db.Exec(sql, Hashs[i], Object_ID, i)
  148. HandleError("insert failed: ", err)
  149. }
  150. }
  151. // 对象副本表rephash插入
  152. func Insert_RepHash(Object_ID int, Hashs string) {
  153. sql := "update ObjectRep set RepHash =? where ObjectID = ?"
  154. _, err := db.Exec(sql, Hashs, Object_ID)
  155. HandleError("insert failed: ", err)
  156. }
  157. // 缓存表插入
  158. func Insert_Cache(Hashs []string, Ips []string, TempOrPin bool) {
  159. for i := 0; i < len(Hashs); i++ {
  160. sql := "insert into Cache values(?,?,?,?)"
  161. _, err := db.Exec(sql, Hashs[i], Ips[i], TempOrPin, time.Now())
  162. HandleError("insert failed: ", err)
  163. }
  164. return
  165. }
  166. func Query_ObjectID(objectname string) (objectid int) {
  167. type Object struct {
  168. ObjectID int `db:"ObjectID"`
  169. }
  170. var x Object
  171. sql := "select ObjectID from Object where ObjectName=? "
  172. err := db.Get(&x, sql, objectname)
  173. HandleError("Get failed: ", err)
  174. if x != (Object{}) {
  175. objectid = x.ObjectID
  176. } else {
  177. fmt.Println("Object not found!")
  178. }
  179. // fmt.Println("select bucketid succ:",bucketid)
  180. return
  181. }
  182. // 根据BucketName查询BucketID
  183. func Query_BucketID(bucketname string) (bucketid int) {
  184. //桶结构体
  185. type Bucket struct {
  186. BucketID int `db:"BucketID"`
  187. BucketName string `db:"BucketName"`
  188. }
  189. var x Bucket
  190. sql := "select BucketID, BucketName from Bucket where BucketName=? "
  191. err := db.Get(&x, sql, bucketname)
  192. HandleError("Get failed: ", err)
  193. if x != (Bucket{}) {
  194. bucketid = x.BucketID
  195. } else {
  196. fmt.Println("Bucket not found!")
  197. bucketid = -1
  198. }
  199. // fmt.Println("select bucketid succ:",bucketid)
  200. return
  201. }
  202. // 根据用户id查询可用nodeip
  203. func Query_UserNode(user_id int) []string {
  204. //用户节点结构体
  205. type UserNode struct {
  206. UserID int `db:"UserID"`
  207. NodeIP string `db:"NodeIP"`
  208. }
  209. var x []UserNode
  210. var node_ip []string
  211. sql := "select UserID, NodeIP from UserNode where UserID=? "
  212. err := db.Select(&x, sql, user_id)
  213. HandleError("select failed: ", err)
  214. for _, value := range x {
  215. node_ip = append(node_ip, value.NodeIP)
  216. }
  217. fmt.Println("select node_ip succ:", node_ip)
  218. return node_ip
  219. }
  220. // 根据objectname和bucketid查询对象表,获得redundancy,EcName,fileSizeInBytes
  221. func Query_Object(objectname string, bucketid int) (objectid int, filesizeinbytes int64, redundancy bool, ecname string) {
  222. //对象结构体
  223. type Object struct {
  224. ObjectID int `db:"ObjectID"`
  225. FileSizeInBytes int64 `db:"FileSizeInBytes"`
  226. Redundancy bool `db:"Redundancy"`
  227. EcName string `db:"EcName"`
  228. }
  229. var x Object
  230. sql := "select ObjectID, FileSizeInBytes, Redundancy, EcName from Object where ObjectName=? AND BucketID=?"
  231. err := db.Get(&x, sql, objectname, bucketid)
  232. HandleError("Get failed: ", err)
  233. if x != (Object{}) {
  234. objectid = x.ObjectID
  235. filesizeinbytes = x.FileSizeInBytes
  236. redundancy = x.Redundancy
  237. ecname = x.EcName
  238. } else {
  239. fmt.Println("Object not found!")
  240. }
  241. return
  242. }
  243. // 查询对象副本表
  244. func Query_ObjectRep(objectid int) (repHash string) {
  245. //对象结构体
  246. type ObjectRep struct {
  247. RepHash string `db:"RepHash"`
  248. }
  249. var x ObjectRep
  250. sql := "select RepHash from ObjectRep where ObjectID=?"
  251. err := db.Get(&x, sql, objectid)
  252. HandleError("Get failed: ", err)
  253. if x != (ObjectRep{}) {
  254. repHash = x.RepHash
  255. } else {
  256. fmt.Println("ObjectRep not found!")
  257. }
  258. return
  259. }
  260. // 对象编码块结构体
  261. type ObjectBlock struct {
  262. InnerID int `db:"InnerID"`
  263. BlockHash string `db:"BlockHash"`
  264. }
  265. // 查询对象编码块表
  266. func Query_ObjectBlock(Object_ID int) (x []ObjectBlock) {
  267. sql := "select InnerID, BlockHash from ObjectBlock where ObjectID=?"
  268. err := db.Select(&x, sql, Object_ID)
  269. HandleError("select failed: ", err)
  270. return
  271. }
  272. // 将时间字符串转化为时间戳格式(s)
  273. func Time_trans(time_string string) (timestamp int64) {
  274. timeTemplate1 := "2006-01-02 15:04:05"
  275. stamp, _ := time.ParseInLocation(timeTemplate1, time_string, time.Local) //使用parseInLocation将字符串格式化返回本地时区时间
  276. timestamp = stamp.Unix()
  277. return
  278. }
  279. // 缓存结构体
  280. type Cache struct {
  281. NodeIP string `db:"NodeIP"`
  282. TempOrPin bool `db:"TempOrPin"`
  283. Cachetime string `db:"Cachetime"`
  284. }
  285. // 查询缓存表
  286. func Query_Cache(BlockHash string) (x []Cache) {
  287. sql := "select NodeIP, TempOrPin, Cachetime from Cache where HashValue=?"
  288. err := db.Select(&x, sql, BlockHash)
  289. HandleError("Get failed: ", err)
  290. return
  291. }
  292. // 更新缓存表
  293. func Update_Cache(BlockHash string, nodeip string) (x Cache) {
  294. //根据hash和nodeip查询缓存表里是否存在此条记录
  295. sql := "select NodeIP, TempOrPin, Cachetime from Cache where HashValue=? AND NodeIP=?"
  296. err := db.Select(&x, sql, BlockHash, nodeip)
  297. HandleError("Get failed: ", err)
  298. //若在表中已存在且所对应的TempOrPin字段为true,则更新Time
  299. if x.TempOrPin == true {
  300. sql = "update Cache set Cachetime=? where HashValue=? AND NodeIP=?"
  301. _, err := db.Exec(sql, time.Now(), BlockHash, nodeip)
  302. HandleError("update failed: ", err)
  303. }
  304. return
  305. }
  306. // 查询节点延迟表
  307. func Query_NodeDelay(innodeip string, outnodeip string) (delay int) {
  308. //节点延迟结构体
  309. type NodeDelay struct {
  310. DelayInMs int `db:"DelayInMs"`
  311. }
  312. var x NodeDelay
  313. sql := "select DelayInMs from NodeDelay where InNodeIP=? AND OutNodeIP=?"
  314. err := db.Get(&x, sql, innodeip, outnodeip)
  315. HandleError("Get failed: ", err)
  316. if x != (NodeDelay{}) {
  317. delay = x.DelayInMs
  318. } else {
  319. fmt.Println("NodeDelay not found!")
  320. }
  321. return
  322. }
  323. /*
  324. func main(){
  325. // Insert_RepHash(1,"aaa")
  326. // Hashs := []string{"aaa","bbb","ddd"}
  327. // // ObjectId := Query_ObjectID(ObjectName)
  328. // // Insert_EcHash(ObjectId, Hashs)
  329. // // fmt.Println(BlockID[0])
  330. // Ips := []string{"chengdu","beijing","changsha"}
  331. // Insert_Cache(Hashs, Ips, false)
  332. // var BucketName string = "bucket01"
  333. // var ObjectName string = "bucket01"
  334. // BucketID := Query_BucketID(BucketName)
  335. // ObjectID, fileSizeInBytes, redundancy, EcName := Query_Object(ObjectName, BucketID)
  336. // fmt.Println(ObjectID,fileSizeInBytes,redundancy,EcName)
  337. // // repHash := Query_ObjectRep(ObjectID)
  338. // // fmt.Println(repHash)
  339. // if redundancy == false{
  340. // objectblock := Query_ObjectBlock(ObjectID)
  341. // var Destination string = "shanghai"
  342. // for _,value := range objectblock{
  343. // Cache := Query_Cache(value.BlockHash)
  344. // Delay := make(map[string]int) // 延迟集合
  345. // for i:=0; i<len(Cache); i++{
  346. // Delay[Cache[i].NodeIP] = Query_NodeDelay(Destination, Cache[i].NodeIP)
  347. // }
  348. // fmt.Println(len(Delay))
  349. // fmt.Println(value.InnerID, value.BlockHash, Cache[0].NodeIP, Cache[0].TempOrPin, Time_trans(Cache[0].Cachetime))
  350. // fmt.Println(value.InnerID, value.BlockHash, Cache[1].NodeIP, Cache[1].TempOrPin, Time_trans(Cache[1].Cachetime))
  351. // }
  352. // }
  353. // var ecN int = 5
  354. // for i:=0; i<ecN; i++{
  355. // Insert_EcObjectBlock(1, i)
  356. // }
  357. // fmt.Println(Query_BucketID("bucket01"))
  358. // a,b,d := "bucket01",123456,"Ec01"
  359. // var c int64 = 12345678987654321
  360. // fmt.Println(Insert_EcObject(a, b, c, d))
  361. // var s int
  362. // s = 123
  363. // c := Query_UserNode(s)
  364. // fmt.Println(c)
  365. }*/

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。