#pragma once #include #include #include #include #include "File.h" #include "Task.h" #include "Array.h" enum SupervisorState { WorkspacesCreation, //0 Preparation, //1 Execution, //2 End //3 }; template class Supervisor : public Array { protected: SupervisorState state; public: virtual String getStatePrefix() { return String(""); } String printState() { switch (state) { case WorkspacesCreation: return String("WorkspacesCreation"); case Preparation: return String("Preparation"); case Execution: return String("Execution"); case End: return String("End"); default: return "?"; } } //- void print() { for (auto& elem : this->getElements()) elem->print(); } void init(const char* fileName, int recordSize) { state = WorkspacesCreation; File* packedTasks = new File(fileName); Text* lines = packedTasks->readLines(); const long length = lines->getLength() / recordSize; int offset = 0; for (int i = 0; i < length; ++i) { this->add(new T(lines, offset)); offset += recordSize; } delete packedTasks; delete lines; } void changeState() { switch (this->state) { case WorkspacesCreation: this->state = Preparation; saveState(); break; case Preparation: this->state = Execution; saveState(); break; case Execution: this->state = End; saveState(); break; default: this->state = End; break; } } void DoWithSchedule(int maxKernels) { saveState(); // подготовка тестов while (this->state != Execution) { for (auto& task : this->getElements()) { switch (this->state) { case WorkspacesCreation: if (task->getState() == Waiting) { task->createWorkspace(); task->setState(WorkspaceCreated); } break; case Preparation: if (task->getState() == WorkspaceCreated) { task->prepareWorkspace(); task->createLaunchScript(); task->setState(WorkspaceReady); } break; default: //printf("id = %ld; state = %d\n", task->getId(), task->getState()); break; } } changeState(); } map, std::greater> sortedByKernelNeeds; long activeTasks = 0; long done = 0; for (auto& task : this->getElements()) { if (task->getState() == WorkspaceReady) { activeTasks++; sortedByKernelNeeds[task->getKernels()].push(task); } } printf("total tasks count = %ld, active task count %ld, maxkernels %d\n", this->getLength(), activeTasks, maxKernels); int busyKernels = 0; set activeTaskSet; bool ignoreCheck = true; String pathRes("results"); Utils::Mkdir(pathRes); string buf; while (activeTasks) { long oldActiveTasks = activeTasks; vector emptyKeys; //ставим задачи от больших к меньшему по ядрам for (auto& elem : sortedByKernelNeeds) { int freeKernels = maxKernels - busyKernels; int kernelsNeeded = elem.first; while (kernelsNeeded <= freeKernels && elem.second.size()) { T* task = elem.second.front(); elem.second.pop(); activeTaskSet.insert(task); task->Start(ignoreCheck); printf("start task with %d kernels and id %ld\n", task->getKernels(), task->getId()); busyKernels += task->getKernels(); freeKernels = maxKernels - busyKernels; } if (elem.second.size() == 0) emptyKeys.push_back(kernelsNeeded); //если ядер не осталось, то нет смысла дальше смотреть if (freeKernels == 0) break; } // очищаем от пустых ключей for (auto& empty : emptyKeys) sortedByKernelNeeds.erase(empty); // проверяем нет ли завершившихся задач for (auto it = activeTaskSet.begin(); it != activeTaskSet.end(); ) { T* task = *(it); if (task->Check()) { it++; activeTaskSet.erase(task); activeTasks--; done++; busyKernels -= task->getKernels(); printf(" done task with %d kernels and id %ld\n", task->getKernels(), task->getId()); buf += to_string(task->getId()) + " " + string(task->printState().getCharArray()) + " " + to_string(task->getTotalTime()) + "\n"; task->copyResults(pathRes); continue; } it++; } if (oldActiveTasks != activeTasks) printf("done %ld / %ld\n", done, this->getLength()); } changeState(); String outFile(pathRes + "/"+getStatePrefix()+"Info.txt"); File tmp(outFile, String(buf.c_str())); } void saveState() { Utils::Sleep(1); //чтобы не было одинаковых по дате файлов. String stateFile = packageWorkspace + "/state/" + getStatePrefix() + printState(); File tmp(stateFile, Utils::getDate()); } };