You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

nodeserver.go 9.6 kB

1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. package rclone
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "os"
  6. "os/exec"
  7. "strings"
  8. v1 "k8s.io/api/core/v1"
  9. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  10. "k8s.io/klog"
  11. "github.com/container-storage-interface/spec/lib/go/csi"
  12. "golang.org/x/net/context"
  13. "google.golang.org/grpc/codes"
  14. "google.golang.org/grpc/status"
  15. "k8s.io/client-go/tools/clientcmd"
  16. "k8s.io/kubernetes/pkg/util/mount"
  17. "k8s.io/kubernetes/pkg/volume/util"
  18. csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
  19. )
  20. type nodeServer struct {
  21. *csicommon.DefaultNodeServer
  22. mounter *mount.SafeFormatAndMount
  23. }
  24. type mountPoint struct {
  25. VolumeId string
  26. MountPath string
  27. }
  28. func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
  29. klog.Infof("NodePublishVolume: called with args %+v", *req)
  30. targetPath := req.GetTargetPath()
  31. notMnt, err := mount.New("").IsLikelyNotMountPoint(targetPath)
  32. if err != nil {
  33. if os.IsNotExist(err) {
  34. if err := os.MkdirAll(targetPath, 0750); err != nil {
  35. return nil, status.Error(codes.Internal, err.Error())
  36. }
  37. notMnt = true
  38. } else {
  39. return nil, status.Error(codes.Internal, err.Error())
  40. }
  41. }
  42. if !notMnt {
  43. // testing original mount point, make sure the mount link is valid
  44. if _, err := ioutil.ReadDir(targetPath); err == nil {
  45. klog.Infof("already mounted to target %s", targetPath)
  46. return &csi.NodePublishVolumeResponse{}, nil
  47. }
  48. // todo: mount link is invalid, now unmount and remount later (built-in functionality)
  49. klog.Warningf("ReadDir %s failed with %v, unmount this directory", targetPath, err)
  50. ns.mounter = &mount.SafeFormatAndMount{
  51. Interface: mount.New(""),
  52. Exec: mount.NewOsExec(),
  53. }
  54. if err := ns.mounter.Unmount(targetPath); err != nil {
  55. klog.Errorf("Unmount directory %s failed with %v", targetPath, err)
  56. return nil, err
  57. }
  58. }
  59. mountOptions := req.GetVolumeCapability().GetMount().GetMountFlags()
  60. if req.GetReadonly() {
  61. mountOptions = append(mountOptions, "ro")
  62. }
  63. remote, remotePath, configData, flags, e := extractFlags(req.GetVolumeContext())
  64. if e != nil {
  65. klog.Warningf("storage parameter error: %s", e)
  66. return nil, e
  67. }
  68. e = Mount(remote, remotePath, targetPath, configData, flags)
  69. if e != nil {
  70. if os.IsPermission(e) {
  71. return nil, status.Error(codes.PermissionDenied, e.Error())
  72. }
  73. if strings.Contains(e.Error(), "invalid argument") {
  74. return nil, status.Error(codes.InvalidArgument, e.Error())
  75. }
  76. return nil, status.Error(codes.Internal, e.Error())
  77. }
  78. return &csi.NodePublishVolumeResponse{}, nil
  79. }
  80. // extractFlags extracts the flags from the given volumeContext
  81. // Retturns: remote, remotePath, configData, flags, error
  82. func extractFlags(volumeContext map[string]string) (string, string, string, map[string]string, error) {
  83. // Load default connection settings from secret
  84. var secret *v1.Secret
  85. if secretName, ok := volumeContext["secretName"]; ok {
  86. // Load the secret that the PV spec defines
  87. var e error
  88. secret, e = getSecret(secretName)
  89. if e != nil {
  90. // if the user explicitly requested a secret and there is an error fetching it, bail with an error
  91. return "", "", "", nil, e
  92. }
  93. } else {
  94. // use rclone-secret as the default secret if none was defined
  95. secret, _ = getSecret("rclone-secret")
  96. }
  97. // Empty argument list
  98. flags := make(map[string]string)
  99. // Secret values are default, gets merged and overriden by corresponding PV values
  100. if secret != nil && secret.Data != nil && len(secret.Data) > 0 {
  101. // Needs byte to string casting for map values
  102. for k, v := range secret.Data {
  103. flags[k] = string(v)
  104. }
  105. } else {
  106. klog.Infof("No csi-rclone connection defaults secret found.")
  107. }
  108. if len(volumeContext) > 0 {
  109. for k, v := range volumeContext {
  110. flags[k] = v
  111. }
  112. }
  113. if e := validateFlags(flags); e != nil {
  114. return "", "", "", flags, e
  115. }
  116. remote := flags["remote"]
  117. remotePath := flags["remotePath"]
  118. if remotePathSuffix, ok := flags["remotePathSuffix"]; ok {
  119. remotePath = remotePath + remotePathSuffix
  120. delete(flags, "remotePathSuffix")
  121. }
  122. configData := ""
  123. ok := false
  124. if configData, ok = flags["configData"]; ok {
  125. delete(flags, "configData")
  126. }
  127. delete(flags, "remote")
  128. delete(flags, "remotePath")
  129. delete(flags, "secretName")
  130. return remote, remotePath, configData, flags, nil
  131. }
  132. func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
  133. klog.Infof("NodeUnPublishVolume: called with args %+v", *req)
  134. targetPath := req.GetTargetPath()
  135. if len(targetPath) == 0 {
  136. return nil, status.Error(codes.InvalidArgument, "NodeUnpublishVolume Target Path must be provided")
  137. }
  138. m := mount.New("")
  139. notMnt, err := m.IsLikelyNotMountPoint(targetPath)
  140. if err != nil && !mount.IsCorruptedMnt(err) {
  141. return nil, status.Error(codes.Internal, err.Error())
  142. }
  143. if notMnt && !mount.IsCorruptedMnt(err) {
  144. klog.Infof("Volume not mounted")
  145. } else {
  146. err = util.UnmountPath(req.GetTargetPath(), m)
  147. if err != nil {
  148. klog.Infof("Error while unmounting path: %s", err)
  149. // This will exit and fail the NodeUnpublishVolume making it to retry unmount on the next api schedule trigger.
  150. // Since we mount the volume with allow-non-empty now, we could skip this one too.
  151. return nil, status.Error(codes.Internal, err.Error())
  152. }
  153. klog.Infof("Volume %s unmounted successfully", req.VolumeId)
  154. }
  155. return &csi.NodeUnpublishVolumeResponse{}, nil
  156. }
  157. func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
  158. klog.Infof("NodeUnstageVolume: called with args %+v", *req)
  159. return &csi.NodeUnstageVolumeResponse{}, nil
  160. }
  161. func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
  162. klog.Infof("NodeStageVolume: called with args %+v", *req)
  163. return &csi.NodeStageVolumeResponse{}, nil
  164. }
  165. func validateFlags(flags map[string]string) error {
  166. if _, ok := flags["remote"]; !ok {
  167. return status.Errorf(codes.InvalidArgument, "missing volume context value: remote")
  168. }
  169. if _, ok := flags["remotePath"]; !ok {
  170. return status.Errorf(codes.InvalidArgument, "missing volume context value: remotePath")
  171. }
  172. return nil
  173. }
  174. func getSecret(secretName string) (*v1.Secret, error) {
  175. clientset, e := GetK8sClient()
  176. if e != nil {
  177. return nil, status.Errorf(codes.Internal, "can not create kubernetes client: %s", e)
  178. }
  179. kubeconfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
  180. clientcmd.NewDefaultClientConfigLoadingRules(),
  181. &clientcmd.ConfigOverrides{},
  182. )
  183. namespace, _, err := kubeconfig.Namespace()
  184. if err != nil {
  185. return nil, status.Errorf(codes.Internal, "can't get current namespace, error %s", secretName, err)
  186. }
  187. klog.Infof("Loading csi-rclone connection defaults from secret %s/%s", namespace, secretName)
  188. secret, e := clientset.CoreV1().
  189. Secrets(namespace).
  190. Get(secretName, metav1.GetOptions{})
  191. if e != nil {
  192. return nil, status.Errorf(codes.Internal, "can't load csi-rclone settings from secret %s: %s", secretName, e)
  193. }
  194. return secret, nil
  195. }
  196. // Mount routine.
  197. func Mount(remote string, remotePath string, targetPath string, configData string, flags map[string]string) error {
  198. mountCmd := "rclone"
  199. mountArgs := []string{}
  200. defaultFlags := map[string]string{}
  201. defaultFlags["cache-info-age"] = "72h"
  202. defaultFlags["cache-chunk-clean-interval"] = "15m"
  203. defaultFlags["dir-cache-time"] = "5s"
  204. defaultFlags["vfs-cache-mode"] = "writes"
  205. defaultFlags["allow-non-empty"] = "true"
  206. defaultFlags["allow-other"] = "true"
  207. remoteWithPath := fmt.Sprintf(":%s:%s", remote, remotePath)
  208. if strings.Contains(configData, "["+remote+"]") {
  209. remoteWithPath = fmt.Sprintf("%s:%s", remote, remotePath)
  210. klog.Infof("remote %s found in configData, remoteWithPath set to %s", remote, remoteWithPath)
  211. }
  212. // rclone mount remote:path /path/to/mountpoint [flags]
  213. mountArgs = append(
  214. mountArgs,
  215. "mount",
  216. remoteWithPath,
  217. targetPath,
  218. "--daemon",
  219. )
  220. // If a custom flag configData is defined,
  221. // create a temporary file, fill it with configData content,
  222. // and run rclone with --config <tmpfile> flag
  223. if configData != "" {
  224. configFile, err := ioutil.TempFile("", "rclone.conf")
  225. if err != nil {
  226. return err
  227. }
  228. // Normally, a defer os.Remove(configFile.Name()) should be placed here.
  229. // However, due to a rclone mount --daemon flag, rclone forks and creates a race condition
  230. // with this nodeplugin proceess. As a result, the config file gets deleted
  231. // before it's reread by a forked process.
  232. if _, err := configFile.Write([]byte(configData)); err != nil {
  233. return err
  234. }
  235. if err := configFile.Close(); err != nil {
  236. return err
  237. }
  238. mountArgs = append(mountArgs, "--config", configFile.Name())
  239. }
  240. // Add default flags
  241. for k, v := range defaultFlags {
  242. // Exclude overriden flags
  243. if _, ok := flags[k]; !ok {
  244. mountArgs = append(mountArgs, fmt.Sprintf("--%s=%s", k, v))
  245. }
  246. }
  247. // Add user supplied flags
  248. for k, v := range flags {
  249. mountArgs = append(mountArgs, fmt.Sprintf("--%s=%s", k, v))
  250. }
  251. // create target, os.Mkdirall is noop if it exists
  252. err := os.MkdirAll(targetPath, 0750)
  253. if err != nil {
  254. return err
  255. }
  256. klog.Infof("executing mount command cmd=%s, remote=%s, targetpath=%s", mountCmd, remoteWithPath, targetPath)
  257. out, err := exec.Command(mountCmd, mountArgs...).CombinedOutput()
  258. if err != nil {
  259. return fmt.Errorf("mounting failed: %v cmd: '%s' remote: '%s' targetpath: %s output: %q",
  260. err, mountCmd, remoteWithPath, targetPath, string(out))
  261. }
  262. return nil
  263. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。