#=----------------------------------------------------------------------------=# # Scheduler related functions # # # # Copyright © 2021 The Gem-graph Project # # # # This file is part of gem-graph. # # # # This program is free software: you can redistribute it and/or modify # # it under the terms of the GNU Affero General Public License as # # published by the Free Software Foundation, either version 3 of the # # License, or (at your option) any later version. # # # # This program is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU Affero General Public License for more details. # # # # You should have received a copy of the GNU Affero General Public License # # along with this program. If not, see . # #=----------------------------------------------------------------------------=# import os, multiprocessing, threading import random import localthread ## Master Thread class GreatScheduler(threading.Thread): def __init__(self, preemption_space, transitions_tree, arrow_list, n_thread, nmax_cycles): threading.Thread.__init__(self) self.preemption_space = preemption_space self.transition_tree = transitions_tree self.arrow_list = arrow_list if n_thread == 0: # No limit # Use number of usable CPUs multiplied by 3 self.n_thread = len(os.sched_getaffinity(0)) * 3 else: self.n_thread = n_thread self.cur_id = -1 self.n_cycle = -1 self.nmax_cycles = nmax_cycles self.stopped = False def run(self): thread_list = [] shared_memory = multiprocessing.Manager() while not self.stopped and self.n_cycle != self.nmax_cycles: self.n_cycle += 1 #print("--- Mesures et écoute ---") #print("Espace global de préemption :\t {}".format(self.preemption_space)) #print("Liste des flèches :\t {}".format(self.arrow_list)) ## Tirer une flêche au hasard n = int(random.random()*len(self.arrow_list)) elected_arrow = self.arrow_list[n] ## Vérification de l'espace global de préemption autour de la flêche if not(len(thread_list) >= self.n_thread) and self.preemption_space[elected_arrow[0]]: print("Espace local libre à {}".format(elected_arrow[0])) self.preemption_space[elected_arrow[0]] = False ## Création du thread local self.cur_id += 1 thread_list.append( localthread.LocalComputingUnit(self.cur_id, shared_memory, elected_arrow[0], elected_arrow[1], self.transition_tree ) ) ## Lancer le thread local thread_list[-1].start() for thread in thread_list: if not thread.is_alive(): if not thread.namespace.returncode.__class__ is localthread.SuccessfulOperation: if thread.namespace.returncode.__class__ is localthread.FailedOperation: print("A optimiser : créer une règle") else: raise thread.namespace.returncode print("Thread local n°{} est terminé".format(thread.id)) self.preemption_space[thread.address] = True thread_list.pop(thread_list.index(thread)) ## Sortie du scheduler print("Attente de la fin des thread locaux") for thread in thread_list: thread.join() if not thread.namespace.returncode.__class__ is localthread.SuccessfulOperation: if thread.namespace.returncode.__class__ is localthread.FailedOperation: print("A optimiser : créer une règle") else: raise thread.namespace.returncode print("Thread local n°{} est terminé".format(thread.id))