package TestingSystem.DVM; import Common.Constants; import Common.Global; import Common.Utils.Utils; import GlobalData.RemoteFile.RemoteFile; import GlobalData.Tasks.TaskState; import Repository.Server.ServerCode; import TestingSystem.DVM.Tasks.TestCompilationTask; import TestingSystem.DVM.Tasks.TestRunTask; import TestingSystem.DVM.Tasks.TestRunTaskInterface; import TestingSystem.DVM.Tasks.TestTask; import TestingSystem.DVM.TasksPackage.TasksPackage; import TestingSystem.DVM.TasksPackage.TasksPackageState; import TestingSystem.Common.Test.TestType; import TestingSystem.Common.TestingPlanner; import com.jcraft.jsch.ChannelSftp; import javafx.util.Pair; import org.apache.commons.io.FileUtils; import java.io.File; import java.nio.charset.Charset; import java.nio.file.Paths; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Vector; public class TestsSupervisor_2022 { protected TestingPlanner planner; //планировщик. protected UserConnection connection; protected TasksPackage tasksPackage; protected RemoteFile packageRemoteWorkspace; protected File packageLocalWorkspace; protected Vector compilationTasks; //список задач на компиляцию protected int count = 0; //число активных задач. //---- public TestsSupervisor_2022(TestingPlanner planner_in, UserConnection connection_in, TasksPackage tasksPackage_in, Vector tasks_in) { planner = planner_in; connection = connection_in; tasksPackage = tasksPackage_in; compilationTasks = tasks_in; planner.Print(getClass().getSimpleName() + ": найдено задач на компиляцию: " + compilationTasks.size()); packageRemoteWorkspace = new RemoteFile(tasksPackage.user_workspace + "/tests", tasksPackage.id, true); packageLocalWorkspace = Paths.get(Global.PackagesDirectory.getAbsolutePath(), tasksPackage.id).toFile(); } public boolean packageNeedsKill() throws Exception{ return (boolean) planner.ServerCommand(ServerCode.CheckPackageToKill,tasksPackage.id); } public void Perform() throws Exception { if (packageNeedsKill()){ System.out.println("PACKAGE "+tasksPackage.id+" NEEDS TO KILL"); if (!tasksPackage.pid.isEmpty()) { connection.ShellCommand("kill -9 " + tasksPackage.pid); } tasksPackage.state = TasksPackageState.Aborted; planner.UpdatePackage(); }else { switch (tasksPackage.state) { case TestsSynchronize: TestsSynchronize(); tasksPackage.state = TasksPackageState.PackageWorkspaceCreation; planner.UpdatePackage(); break; case PackageWorkspaceCreation: PackageWorkspaceCreation(); tasksPackage.state = TasksPackageState.PackageStart; planner.UpdatePackage(); break; case PackageStart: PackageStart(); tasksPackage.state = TasksPackageState.CompilationWorkspacesCreation; planner.UpdatePackage(); break; case CompilationWorkspacesCreation: case CompilationPreparation: case CompilationExecution: case RunningWorkspacesCreation: case RunningPreparation: case RunningExecution: checkNextState(); break; case RunningEnd: DownloadResults(); tasksPackage.state = TasksPackageState.Analysis; planner.UpdatePackage(); break; case Analysis: AnalyseResults(); tasksPackage.state = TasksPackageState.Done; planner.UpdatePackage(); break; default: break; } } } private void TestsSynchronize() throws Exception { //1, получить набор уникальных тестов. Vector test_ids = new Vector<>(); for (TestCompilationTask current_task : compilationTasks) if (!test_ids.contains(current_task.test_id)) test_ids.add(String.valueOf(current_task.test_id)); //синхронизировать их. for (String test_id : test_ids) { File test_src = Paths.get(Global.TestsDirectory.getAbsolutePath(), test_id).toFile(); RemoteFile test_dst = new RemoteFile(tasksPackage.user_workspace + "/projects/" + test_id, true); connection.MKDIR(test_dst); connection.SynchronizeSubDirsR(test_src, test_dst); } } private void PackageWorkspaceCreation() throws Exception { //создать папку для пакета. connection.sftpChannel.mkdir(packageRemoteWorkspace.full_name); //положить туда запакованные тексты задач. Vector compilationLines = new Vector<>(); Vector runLines = new Vector<>(); for (TestCompilationTask compilationTask : planner.packageTasks.values()) { compilationLines.addAll(compilationTask.pack(1)); for (TestRunTask runTask : compilationTask.runTasks) { int rt_kernels = (runTask.test_type == TestType.Performance) ? tasksPackage.kernels : Math.min(Utils.getMatrixProcessors(runTask.matrix), tasksPackage.kernels); runLines.addAll(runTask.pack(rt_kernels)); } } RemoteFile compilationPackage = new RemoteFile(packageRemoteWorkspace, "compilationTasks"); RemoteFile runPackage = new RemoteFile(packageRemoteWorkspace, "runTasks"); connection.writeToFile(String.join("\n", compilationLines) + "\n", compilationPackage); connection.writeToFile(String.join("\n", runLines) + "\n", runPackage); // -- connection.MKDIR(new RemoteFile(packageRemoteWorkspace, "state")); } private void PackageStart() throws Exception { String plannerStartCommand = String.join(" ", "nohup", Utils.DQuotes(planner.getPlanner()), Utils.DQuotes(tasksPackage.user_workspace), Utils.DQuotes(packageRemoteWorkspace.full_name), Utils.DQuotes(tasksPackage.kernels), Utils.DQuotes(tasksPackage.dvm_drv), "&" ); connection.ShellCommand(plannerStartCommand); RemoteFile PID = new RemoteFile(packageRemoteWorkspace, "PID"); while (!connection.Exists(packageRemoteWorkspace.full_name, "STARTED")){ System.out.println("waiting for package start..."); Utils.sleep(1000); } if (connection.Exists(packageRemoteWorkspace.full_name, "PID")){ tasksPackage.pid = connection.readFromFile(PID); } } public void checkNextState() throws Exception { TasksPackageState oldState = tasksPackage.state; Vector files_ = connection.sftpChannel.ls(packageRemoteWorkspace.full_name + "/state"); Vector files = new Vector<>(); for (ChannelSftp.LsEntry file : files_) { try { TasksPackageState.valueOf(file.getFilename()); files.add(file); } catch (Exception ignore) { } } files.sort(Comparator.comparingInt(o -> o.getAttrs().getMTime())); if (!files.isEmpty()) { String fileName = files.get(files.size() - 1).getFilename(); System.out.println(fileName + " last file"); tasksPackage.state = TasksPackageState.valueOf(files.get(files.size() - 1).getFilename()); if (tasksPackage.state != oldState) planner.UpdatePackage(); } } public void DownloadResults() throws Exception { Utils.CheckDirectory(packageLocalWorkspace); for (TestCompilationTask testCompilationTask : compilationTasks) { //------------>>> if (TryDownloadTask(testCompilationTask)) { for (TestRunTask testRunTask : testCompilationTask.runTasks) { TryDownloadTask(testRunTask); } }else { //задача на компиляцию не состоялась. значит и все ее задачи на запуск тоже. for (TestRunTask testRunTask : testCompilationTask.runTasks) { testRunTask.state = TaskState.Canceled; planner.UpdateTask(testRunTask); } } //--->>>>>>>>> } } public boolean TryDownloadTask(TestTask testTask) throws Exception { if ( !testTask.state.equals(TaskState.ResultsDownloaded) && !testTask.state.equals(TaskState.Canceled) ) { File taskLocalWorkspace = Paths.get(packageLocalWorkspace.getAbsolutePath(), String.valueOf(testTask.id)).toFile(); RemoteFile taskRemoteWorkspace = new RemoteFile(packageRemoteWorkspace.full_name, String.valueOf(testTask.id)); Utils.CheckDirectory(taskLocalWorkspace); if (connection.Exists(packageRemoteWorkspace.full_name, String.valueOf(testTask.id))) { CheckTaskFile(taskRemoteWorkspace, taskLocalWorkspace, Constants.out_file); CheckTaskFile(taskRemoteWorkspace, taskLocalWorkspace, Constants.err_file); CheckTaskFile(taskRemoteWorkspace, taskLocalWorkspace, Constants.time_file); CheckTaskFile(taskRemoteWorkspace, taskLocalWorkspace, "TaskState"); if (testTask instanceof TestRunTask) CheckTaskFile(taskRemoteWorkspace, taskLocalWorkspace, "sts.gz+"); testTask.state = TaskState.ResultsDownloaded; planner.UpdateTask(testTask); return true; } else { testTask.state = TaskState.Canceled; planner.UpdateTask(testTask); return false; //нет раб пространства. значит задача не выполнялась. } } else return true; } public void CheckTaskFile(RemoteFile taskRemoteWorkspace, File taskLocalWorkspace, String fileName) throws Exception { RemoteFile rFile = new RemoteFile(taskRemoteWorkspace.full_name, fileName); File lFile = Paths.get(taskLocalWorkspace.getAbsolutePath(), fileName).toFile(); if (connection.Exists(taskRemoteWorkspace.full_name, fileName)) { connection.getSingleFile(rFile, lFile, 0); } } public void AnalyseResults() throws Exception { System.out.println("analysing results"); int ct_count = 0; int rt_count = 0; for (TestCompilationTask testCompilationTask : compilationTasks) { ct_count++; if (CheckTask(testCompilationTask)) { planner.UpdateTask(testCompilationTask); for (TestRunTask testRunTask : testCompilationTask.runTasks) { rt_count++; testRunTask.compilation_state = testCompilationTask.state; testRunTask.compilation_output = testCompilationTask.output; testRunTask.compilation_errors = testCompilationTask.errors; if (testCompilationTask.state == TaskState.DoneWithErrors) { testRunTask.state = TaskState.Canceled; } else { CheckTask(testRunTask); } planner.UpdateTask(testRunTask); if (testRunTask.state.equals(TaskState.Finished)) { //анализ задачи на запуск. List output_lines = Arrays.asList(testRunTask.output.split("\n")); List errors_lines = Arrays.asList(testRunTask.errors.split("\n")); //--- if (TestRunTaskInterface.isCrushed(output_lines, errors_lines)) { testRunTask.state = TaskState.Crushed; } else { Pair results = new Pair<>(TaskState.Done, 100); switch (testRunTask.test_type) { case Correctness: results = TestRunTaskInterface.analyzeCorrectness(output_lines); break; case Performance: results = TestRunTaskInterface.analyzePerformance(output_lines); break; default: break; } testRunTask.state = results.getKey(); testRunTask.progress = results.getValue(); testRunTask.CleanTime = TestRunTaskInterface.parseCleanTime(testRunTask.output); } File local_sts_text = Utils.getTempFileName("sts_text"); Vector files = connection.sftpChannel.ls(testRunTask.remote_workspace); for (ChannelSftp.LsEntry file : files) { if (file.getFilename().equals("sts.gz+")) { RemoteFile remote_sts = new RemoteFile( testRunTask.remote_workspace, file.getFilename(), false); RemoteFile remote_sts_text = new RemoteFile( testRunTask.remote_workspace, "statistic.txt", false); try { connection.ShellCommand(Utils.DQuotes(tasksPackage.dvm_drv) + " pa " + Utils.DQuotes(remote_sts.full_name) + " " + Utils.DQuotes(remote_sts_text.full_name)); connection.getSingleFile(remote_sts_text, local_sts_text, 10240); } catch (Exception ex) { ex.printStackTrace(); } if (local_sts_text.exists()) { try { testRunTask.statistic = FileUtils.readFileToString(local_sts_text, Charset.defaultCharset()); } catch (Exception e) { e.printStackTrace(); } } break; } } } planner.UpdateTask(testRunTask); } } } System.out.println("ct_count=" + ct_count + " rt count=" + rt_count); } public boolean CheckTask(TestTask testTask) throws Exception { if (testTask.state.equals(TaskState.ResultsDownloaded)) { File taskWorkspace = Paths.get(packageLocalWorkspace.getAbsolutePath(), String.valueOf(testTask.id)).toFile(); System.out.println("id=" + testTask.id + ": path=" + taskWorkspace.getAbsolutePath()); File stateFile = Paths.get(taskWorkspace.getAbsolutePath(), "TaskState").toFile(); File outFile = Paths.get(taskWorkspace.getAbsolutePath(), Constants.out_file).toFile(); File errFile = Paths.get(taskWorkspace.getAbsolutePath(), Constants.err_file).toFile(); File timeFile = Paths.get(taskWorkspace.getAbsolutePath(), Constants.time_file).toFile(); if (outFile.exists()) testTask.output = FileUtils.readFileToString(outFile); if (errFile.exists()) testTask.errors = FileUtils.readFileToString(errFile); if (timeFile.exists()) testTask.Time = Double.parseDouble(Utils.ReadAllText(timeFile)); if (stateFile.exists()) { String stateText = FileUtils.readFileToString(stateFile, Charset.defaultCharset()).replace("\n", ""); testTask.state = TaskState.valueOf(stateText); } else testTask.state = TaskState.InternalError; //поменять на то что состояние не найдено. ? return true; } return false; } }