промежуточный. частичный рефакторинг инициализации. еще не готов.
This commit is contained in:
218
src/files/Supervisor.h
Normal file
218
src/files/Supervisor.h
Normal file
@@ -0,0 +1,218 @@
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <vector>
|
||||
#include <queue>
|
||||
#include <math.h>
|
||||
#include "File.h"
|
||||
#include "Task.h"
|
||||
#include "Array.h"
|
||||
|
||||
enum SupervisorState {
|
||||
WorkspacesCreation, //0
|
||||
Preparation, //1
|
||||
Execution, //2
|
||||
End //3
|
||||
};
|
||||
|
||||
template <class T>
|
||||
class Supervisor : public Array <T> {
|
||||
protected:
|
||||
SupervisorState state;
|
||||
public:
|
||||
virtual String getStatePrefix() {
|
||||
return String("");
|
||||
}
|
||||
String printState() {
|
||||
switch (state) {
|
||||
case WorkspacesCreation:
|
||||
return String("WorkspacesCreation");
|
||||
case Preparation:
|
||||
return String("Preparation");
|
||||
case Execution:
|
||||
return String("Execution");
|
||||
case End:
|
||||
return String("End");
|
||||
default:
|
||||
return "?";
|
||||
}
|
||||
}
|
||||
//-
|
||||
void print() {
|
||||
for (auto& elem : this->getElements())
|
||||
elem->print();
|
||||
}
|
||||
|
||||
void init(const char* fileName, int recordSize) {
|
||||
state = WorkspacesCreation;
|
||||
File* packedTasks = new File(fileName);
|
||||
Text* lines = packedTasks->readLines();
|
||||
|
||||
const long length = lines->getLength() / recordSize;
|
||||
int offset = 0;
|
||||
for (int i = 0; i < length; ++i) {
|
||||
this->add(new T(lines, offset));
|
||||
offset += recordSize;
|
||||
}
|
||||
delete packedTasks;
|
||||
delete lines;
|
||||
}
|
||||
void changeState() {
|
||||
switch (this->state) {
|
||||
case WorkspacesCreation:
|
||||
this->state = Preparation;
|
||||
saveState();
|
||||
break;
|
||||
case Preparation:
|
||||
this->state = Execution;
|
||||
saveState();
|
||||
break;
|
||||
case Execution:
|
||||
this->state = End;
|
||||
saveState();
|
||||
break;
|
||||
default:
|
||||
this->state = End;
|
||||
break;
|
||||
}
|
||||
}
|
||||
void DoWithSchedule(int maxKernels) {
|
||||
saveState();
|
||||
saveProgress(0);
|
||||
// подготовка тестов
|
||||
while (this->state != Execution) {
|
||||
for (auto& task : this->getElements()) {
|
||||
switch (this->state) {
|
||||
case WorkspacesCreation:
|
||||
if (task->getState() == Waiting) {
|
||||
task->createWorkspace();
|
||||
task->setState(WorkspaceCreated);
|
||||
}
|
||||
break;
|
||||
case Preparation:
|
||||
if (task->getState() == WorkspaceCreated) {
|
||||
task->prepareWorkspace();
|
||||
task->createLaunchScript();
|
||||
task->setState(WorkspaceReady);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
//printf("id = %ld; state = %d\n", task->getId(), task->getState());
|
||||
break;
|
||||
}
|
||||
}
|
||||
changeState();
|
||||
}
|
||||
|
||||
map<int, queue<T*>, std::greater<int>> sortedByKernelNeeds;
|
||||
size_t activeTasks = 0;
|
||||
|
||||
for (auto& task : this->getElements()) {
|
||||
if (task->getState() == WorkspaceReady) {
|
||||
activeTasks++;
|
||||
sortedByKernelNeeds[task->getKernels()].push(task);
|
||||
}
|
||||
}
|
||||
|
||||
printf("total tasks count = %ld, active task count %ld, maxkernels %d\n", this->getLength(), activeTasks, maxKernels);
|
||||
|
||||
int busyKernels = 0;
|
||||
set<T*> activeTaskSet;
|
||||
bool ignoreCheck = true;
|
||||
|
||||
String pathRes("results");
|
||||
Utils::Mkdir(pathRes);
|
||||
string buf;
|
||||
|
||||
vector<int> emptyKeys;
|
||||
vector<T*> toDel;
|
||||
|
||||
size_t done = 0;
|
||||
//size_t step = activeTasks * 0.01; // step == 1%
|
||||
size_t step = ceil(activeTasks * 0.01); // step == 1%
|
||||
const double total = activeTasks;
|
||||
|
||||
while (activeTasks) {
|
||||
long oldActiveTasks = activeTasks;
|
||||
emptyKeys.clear();
|
||||
toDel.clear();
|
||||
|
||||
//ставим задачи от больших к меньшему по ядрам
|
||||
for (auto& elem : sortedByKernelNeeds) {
|
||||
int freeKernels = maxKernels - busyKernels;
|
||||
int kernelsNeeded = elem.first;
|
||||
|
||||
while (kernelsNeeded <= freeKernels && elem.second.size()) {
|
||||
T* task = elem.second.front();
|
||||
elem.second.pop();
|
||||
|
||||
activeTaskSet.insert(task);
|
||||
task->Start(ignoreCheck);
|
||||
#if DEB
|
||||
printf("start task with %d kernels and id %ld\n", task->getKernels(), task->getId());
|
||||
#endif
|
||||
busyKernels += task->getKernels();
|
||||
freeKernels = maxKernels - busyKernels;
|
||||
}
|
||||
|
||||
if (elem.second.size() == 0)
|
||||
emptyKeys.push_back(kernelsNeeded);
|
||||
|
||||
//если ядер не осталось, то нет смысла дальше смотреть
|
||||
if (freeKernels == 0)
|
||||
break;
|
||||
}
|
||||
|
||||
// очищаем от пустых ключей
|
||||
for (auto& empty : emptyKeys)
|
||||
sortedByKernelNeeds.erase(empty);
|
||||
|
||||
// проверяем нет ли завершившихся задач
|
||||
for (auto& task : activeTaskSet)
|
||||
{
|
||||
if (task->Check()) {
|
||||
toDel.push_back(task);
|
||||
activeTasks--;
|
||||
done++;
|
||||
busyKernels -= task->getKernels();
|
||||
#if DEB
|
||||
printf(" done task with %d kernels and id %ld\n", task->getKernels(), task->getId());
|
||||
#endif
|
||||
buf += to_string(task->getId()) + " " + string(task->printState().getCharArray()) + " " + to_string(task->getTotalTime()) + "\n";
|
||||
task->copyResults(pathRes);
|
||||
}
|
||||
}
|
||||
|
||||
// очищаем завершенные задачи
|
||||
for (auto& del : toDel)
|
||||
activeTaskSet.erase(del);
|
||||
|
||||
if (oldActiveTasks != activeTasks) {
|
||||
#if DEB
|
||||
printf("done %ld / %d\n", done, this->getLength());
|
||||
#endif
|
||||
if ((done % step) == 0) {
|
||||
size_t persentDone = (done / total) * 100.0;
|
||||
saveProgress(persentDone);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
changeState();
|
||||
|
||||
String outFile(pathRes + "/" + getStatePrefix() + "Info.txt");
|
||||
File tmp(outFile, String(buf.c_str()));
|
||||
}
|
||||
void saveState() {
|
||||
String stateFile = packageWorkspace + "/state/" + getStatePrefix() + printState();
|
||||
File tmp(stateFile, Utils::getDate());
|
||||
}
|
||||
void saveProgress(long long persentDone) {
|
||||
FILE *f = fopen("progress", "w");
|
||||
if (f) {
|
||||
fprintf(f, "%lld", persentDone);
|
||||
fclose(f);
|
||||
}
|
||||
}
|
||||
};
|
||||
Reference in New Issue
Block a user