|
|
|
@@ -1,10 +1,12 @@ |
|
|
|
import fire |
|
|
|
import os |
|
|
|
import fire |
|
|
|
import joblib |
|
|
|
import zipfile |
|
|
|
import numpy as np |
|
|
|
import learnware |
|
|
|
|
|
|
|
from sklearn import svm |
|
|
|
from shutil import copyfile, rmtree |
|
|
|
|
|
|
|
import learnware |
|
|
|
from learnware.market import EasyMarket, BaseUserInfo |
|
|
|
from learnware.market import database_ops |
|
|
|
from learnware.learnware import Learnware |
|
|
|
@@ -76,14 +78,23 @@ class LearnwareMarketWorkflow: |
|
|
|
spec.save(os.path.join(dir_path, "svm.json")) |
|
|
|
|
|
|
|
init_file = os.path.join(dir_path, "__init__.py") |
|
|
|
os.system(f"cp example_init.py {init_file}") |
|
|
|
copyfile("example_init.py", init_file) # cp example_init.py init_file |
|
|
|
|
|
|
|
yaml_file = os.path.join(dir_path, "learnware.yaml") |
|
|
|
os.system(f"cp example.yaml {yaml_file}") |
|
|
|
copyfile("example.yaml", yaml_file) # cp example.yaml yaml_file |
|
|
|
|
|
|
|
zip_file = dir_path + ".zip" |
|
|
|
os.system(f"zip -q -r -j {zip_file} {dir_path}") |
|
|
|
os.system(f"rm -r {dir_path}") |
|
|
|
# zip -q -r -j zip_file dir_path |
|
|
|
with zipfile.ZipFile(zip_file, "w") as zip_obj: |
|
|
|
for foldername, subfolders, filenames in os.walk(dir_path): |
|
|
|
for filename in filenames: |
|
|
|
file_path = os.path.join(foldername, filename) |
|
|
|
zip_info = zipfile.ZipInfo(filename) |
|
|
|
zip_info.compress_type = zipfile.ZIP_STORED |
|
|
|
with open(file_path, "rb") as file: |
|
|
|
zip_obj.writestr(zip_info, file.read()) |
|
|
|
|
|
|
|
rmtree(dir_path) # rm -r dir_path |
|
|
|
|
|
|
|
self.zip_path_list.append(zip_file) |
|
|
|
|
|
|
|
@@ -120,8 +131,13 @@ class LearnwareMarketWorkflow: |
|
|
|
|
|
|
|
idx, zip_path = 1, self.zip_path_list[1] |
|
|
|
unzip_dir = os.path.join(test_folder, f"{idx}") |
|
|
|
|
|
|
|
# unzip -o -q zip_path -d unzip_dir |
|
|
|
if os.path.exists(unzip_dir): |
|
|
|
rmtree(unzip_dir) |
|
|
|
os.makedirs(unzip_dir, exist_ok=True) |
|
|
|
os.system(f"unzip -o -q {zip_path} -d {unzip_dir}") |
|
|
|
with zipfile.ZipFile(zip_path, "r") as zip_obj: |
|
|
|
zip_obj.extractall(path=unzip_dir) |
|
|
|
|
|
|
|
user_info = BaseUserInfo(id="user_0", semantic_spec=user_senmantic) |
|
|
|
_, single_learnware_list, _ = easy_market.search_learnware(user_info) |
|
|
|
@@ -131,11 +147,11 @@ class LearnwareMarketWorkflow: |
|
|
|
for learnware in single_learnware_list: |
|
|
|
print("Choose learnware:", learnware.id, learnware.get_specification().get_semantic_spec()) |
|
|
|
|
|
|
|
os.system(f"rm -r {test_folder}") |
|
|
|
rmtree(test_folder) # rm -r test_folder |
|
|
|
|
|
|
|
def test_stat_search(self, learnware_num=5): |
|
|
|
self._init_learnware_market() |
|
|
|
self.prepare_learnware_randomly(learnware_num) |
|
|
|
self.test_upload_delete_learnware(learnware_num) |
|
|
|
|
|
|
|
print(self.zip_path_list) |
|
|
|
easy_market = EasyMarket() |
|
|
|
@@ -145,8 +161,13 @@ class LearnwareMarketWorkflow: |
|
|
|
|
|
|
|
for idx, zip_path in enumerate(self.zip_path_list): |
|
|
|
unzip_dir = os.path.join(test_folder, f"{idx}") |
|
|
|
|
|
|
|
# unzip -o -q zip_path -d unzip_dir |
|
|
|
if os.path.exists(unzip_dir): |
|
|
|
rmtree(unzip_dir) |
|
|
|
os.makedirs(unzip_dir, exist_ok=True) |
|
|
|
os.system(f"unzip -o -q {zip_path} -d {unzip_dir}") |
|
|
|
with zipfile.ZipFile(zip_path, "r") as zip_obj: |
|
|
|
zip_obj.extractall(path=unzip_dir) |
|
|
|
|
|
|
|
user_spec = specification.rkme.RKMEStatSpecification() |
|
|
|
user_spec.load(os.path.join(unzip_dir, "svm.json")) |
|
|
|
@@ -161,7 +182,7 @@ class LearnwareMarketWorkflow: |
|
|
|
mixture_id = " ".join([learnware.id for learnware in mixture_learnware_list]) |
|
|
|
print(f"mixture_learnware: {mixture_id}\n") |
|
|
|
|
|
|
|
os.system(f"rm -r {test_folder}") |
|
|
|
rmtree(test_folder) # rm -r test_folder |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|