planner_improve #1

Merged
M merged 10 commits from planner_improve into main 2023-12-03 16:20:11 +00:00
Showing only changes of commit 8d5cf5fa4c - Show all commits

View File

@@ -9,229 +9,229 @@
enum SupervisorState { enum SupervisorState {
WorkspacesCreation, //0 WorkspacesCreation, //0
Preparation, //1 Preparation, //1
Execution, //2 Execution, //2
Archivation, //3 Archivation, //3
End //4 End //4
}; };
template <class T> template <class T>
class Supervisor : public Array <T> { class Supervisor : public Array <T> {
protected: protected:
SupervisorState state; SupervisorState state;
public: public:
virtual String getStatePrefix() { virtual String getStatePrefix() {
return String(""); return String("");
} }
String printState() { String printState() {
switch (state) { switch (state) {
case WorkspacesCreation: case WorkspacesCreation:
return String("WorkspacesCreation"); return String("WorkspacesCreation");
case Preparation: case Preparation:
return String("Preparation"); return String("Preparation");
case Execution: case Execution:
return String("Execution"); return String("Execution");
case Archivation: case Archivation:
return String("Archivation"); return String("Archivation");
case End: case End:
return String("End"); return String("End");
default: default:
return "?"; return "?";
} }
} }
//- //-
void print() { void print() {
for (auto& elem : this->getElements()) for (auto& elem : this->getElements())
elem->print(); elem->print();
} }
void init(const char* fileName, int recordSize) { void init(const char* fileName, int recordSize) {
state = WorkspacesCreation; state = WorkspacesCreation;
File* packedTasks = new File(fileName); File* packedTasks = new File(fileName);
Text* lines = packedTasks->readLines(); Text* lines = packedTasks->readLines();
const long length = lines->getLength() / recordSize; const long length = lines->getLength() / recordSize;
int offset = 0; int offset = 0;
for (int i = 0; i < length; ++i) { for (int i = 0; i < length; ++i) {
this->add(new T(lines, offset)); this->add(new T(lines, offset));
offset += recordSize; offset += recordSize;
} }
delete packedTasks; delete packedTasks;
delete lines; delete lines;
} }
void changeState() { void changeState() {
switch (this->state) { switch (this->state) {
case WorkspacesCreation: case WorkspacesCreation:
this->state = Preparation; this->state = Preparation;
saveState(); saveState();
break; break;
case Preparation: case Preparation:
this->state = Execution; this->state = Execution;
saveState(); saveState();
break; break;
case Execution: case Execution:
Finalize(); Finalize();
this->state = End; this->state = End;
saveState(); saveState();
break; break;
default: default:
this->state = End; this->state = End;
break; break;
} }
} }
void Do() { void Do() {
saveState(); saveState();
long activeCount = 0; long activeCount = 0;
//todo обязательно убрать отладочную печать. //todo обязательно убрать отладочную печать.
printf("tasks count = %ld\n", this->getLength()); printf("tasks count = %ld\n", this->getLength());
while (this->state != End) { while (this->state != End) {
// printf("state=%d\n", this->state); // printf("state=%d\n", this->state);
// printf("max=%d; busy=%d; free=%d\n", maxKernels, busyKernels, freeKernels); // printf("max=%d; busy=%d; free=%d\n", maxKernels, busyKernels, freeKernels);
activeCount = 0; activeCount = 0;
for (long i = 0; i < this->getLength(); ++i) { for (long i = 0; i < this->getLength(); ++i) {
T* task = this->get(i); T* task = this->get(i);
switch (this->state) { switch (this->state) {
case WorkspacesCreation: case WorkspacesCreation:
if (task->getState() == Waiting) { if (task->getState() == Waiting) {
activeCount++; activeCount++;
task->createWorkspace(); task->createWorkspace();
task->setState(WorkspaceCreated); task->setState(WorkspaceCreated);
} }
break; break;
case Preparation: case Preparation:
if (task->getState() == WorkspaceCreated) { if (task->getState() == WorkspaceCreated) {
activeCount++; activeCount++;
task->prepareWorkspace(); task->prepareWorkspace();
task->createLaunchScript(); task->createLaunchScript();
task->setState(WorkspaceReady); task->setState(WorkspaceReady);
} }
break; break;
case Execution: case Execution:
if (task->getState() == WorkspaceReady) { if (task->getState() == WorkspaceReady) {
activeCount++; activeCount++;
task->Start(); task->Start();
} }
else if (task->getState() == Running) { else if (task->getState() == Running) {
activeCount++; activeCount++;
task->Check(); task->Check();
} }
break; break;
default: default:
// printf("id = %ld; state = %d\n", task->getId(), task->getState()); // printf("id = %ld; state = %d\n", task->getId(), task->getState());
break; break;
} }
} }
// printf("active count = %d\n", activeCount); // printf("active count = %d\n", activeCount);
if (activeCount == 0) if (activeCount == 0)
changeState(); changeState();
Utils::Sleep(2); Utils::Sleep(2);
} }
} }
void DoWithSchedule(int maxKernels) { void DoWithSchedule(int maxKernels) {
saveState(); saveState();
// подготовка тестов // подготовка тестов
while (this->state != Execution) { while (this->state != Execution) {
for (auto& task : this->getElements()) { for (auto& task : this->getElements()) {
switch (this->state) { switch (this->state) {
case WorkspacesCreation: case WorkspacesCreation:
if (task->getState() == Waiting) { if (task->getState() == Waiting) {
task->createWorkspace(); task->createWorkspace();
task->setState(WorkspaceCreated); task->setState(WorkspaceCreated);
} }
break; break;
case Preparation: case Preparation:
if (task->getState() == WorkspaceCreated) { if (task->getState() == WorkspaceCreated) {
task->prepareWorkspace(); task->prepareWorkspace();
task->createLaunchScript(); task->createLaunchScript();
task->setState(WorkspaceReady); task->setState(WorkspaceReady);
} }
break; break;
default: default:
//printf("id = %ld; state = %d\n", task->getId(), task->getState()); //printf("id = %ld; state = %d\n", task->getId(), task->getState());
break; break;
} }
} }
changeState(); changeState();
} }
map<int, queue<T*>, std::greater<int>> sortedByKernelNeeds; map<int, queue<T*>, std::greater<int>> sortedByKernelNeeds;
long activeTasks = 0; long activeTasks = 0;
for (auto& task : this->getElements()) { for (auto& task : this->getElements()) {
if (task->getState() == WorkspaceReady) { if (task->getState() == WorkspaceReady) {
activeTasks++; activeTasks++;
sortedByKernelNeeds[task->getKernels()].push(task); sortedByKernelNeeds[task->getKernels()].push(task);
} }
} }
printf("total tasks count = %ld, active task count %ld, maxkernels %d\n", this->getLength(), activeTasks, maxKernels); printf("total tasks count = %ld, active task count %ld, maxkernels %d\n", this->getLength(), activeTasks, maxKernels);
int busyKernels = 0; int busyKernels = 0;
set<T*> activeTaskSet; set<T*> activeTaskSet;
bool ignoreCheck = true; bool ignoreCheck = true;
while (activeTasks) { while (activeTasks) {
vector<int> emptyKeys; vector<int> emptyKeys;
//ставим задачи от больших к меньшему по ядрам //ставим задачи от больших к меньшему по ядрам
for (auto& elem : sortedByKernelNeeds) { for (auto& elem : sortedByKernelNeeds) {
int freeKernels = maxKernels - busyKernels; int freeKernels = maxKernels - busyKernels;
int kernelsNeeded = elem.first; int kernelsNeeded = elem.first;
while (kernelsNeeded <= freeKernels && elem.second.size()) { while (kernelsNeeded <= freeKernels && elem.second.size()) {
T* task = elem.second.front(); T* task = elem.second.front();
elem.second.pop(); elem.second.pop();
activeTaskSet.insert(task); activeTaskSet.insert(task);
task->Start(ignoreCheck); task->Start(ignoreCheck);
printf("start task with %d kernels and id %ld\n", task->getKernels(), task->getId()); printf("start task with %d kernels and id %ld\n", task->getKernels(), task->getId());
busyKernels += task->getKernels(); busyKernels += task->getKernels();
freeKernels = maxKernels - busyKernels; freeKernels = maxKernels - busyKernels;
} }
if (elem.second.size() == 0) if (elem.second.size() == 0)
emptyKeys.push_back(kernelsNeeded); emptyKeys.push_back(kernelsNeeded);
//если ядер не осталось, то нет смысла дальше смотреть //если ядер не осталось, то нет смысла дальше смотреть
if (freeKernels == 0) if (freeKernels == 0)
break; break;
} }
// очищаем от пустых ключей // очищаем от пустых ключей
for (auto& empty : emptyKeys) for (auto& empty : emptyKeys)
sortedByKernelNeeds.erase(empty); sortedByKernelNeeds.erase(empty);
// проверяем нет ли завершившихся задач // проверяем нет ли завершившихся задач
for (auto it = activeTaskSet.begin(); it != activeTaskSet.end(); ) for (auto it = activeTaskSet.begin(); it != activeTaskSet.end(); )
{ {
T* task = *(it); T* task = *(it);
if (task->Check()) { if (task->Check()) {
it++; it++;
activeTaskSet.erase(task); activeTaskSet.erase(task);
activeTasks--; activeTasks--;
busyKernels -= task->getKernels(); busyKernels -= task->getKernels();
printf(" done task with %d kernels and id %ld\n", task->getKernels(), task->getId()); printf(" done task with %d kernels and id %ld\n", task->getKernels(), task->getId());
continue; continue;
} }
it++; it++;
} }
} }
changeState(); changeState();
} }
virtual void Finalize() {} virtual void Finalize() {}
void saveState() { void saveState() {
Utils::Sleep(1); //чтобы не было одинаковых по дате файлов. Utils::Sleep(1); //чтобы не было одинаковых по дате файлов.
String stateFile = packageWorkspace + "/state/" + getStatePrefix() + printState(); String stateFile = packageWorkspace + "/state/" + getStatePrefix() + printState();
//printf("stateFile=<%s>\n", stateFile.getCharArray()); //printf("stateFile=<%s>\n", stateFile.getCharArray());
File tmp(stateFile, Utils::getDate()); File tmp(stateFile, Utils::getDate());
} }
}; };