gem-graph-server/src/scheduler.pyx

136 lines
4.3 KiB
Cython
Raw Normal View History

2021-04-09 18:01:29 +02:00
import multiprocessing, threading
import time
import random
ARROW_NUMBER = 150
MAX_CYCLES = 20000
SPACE_SIZE = 10000
PREEMPTION_GLOBAL_SPACE = [True] * SPACE_SIZE
ARROW_LIST = [(random.randint(0,SPACE_SIZE - 1),0) for x in range(ARROW_NUMBER)]
TRANSITIONS_TREE = None
MAX_THREAD = 300
class CurrentlyComputing(Exception):
pass
class SuccessfulOperation(Exception):
pass
class FailedOperation(Exception):
pass
class LocalThread(multiprocessing.Process):
def __init__(self, id, shared_memory, address, orientation, transitions):
multiprocessing.Process.__init__(self)
self.id = id
self.address = address
self.orientation = orientation
self.transitions = transitions
self.namespace = shared_memory.Namespace()
self.namespace.returncode = CurrentlyComputing()
def run(self):
try:
# Actual code
print("Thread local n°{} parle depuis {} !".format(self.id, self.address))
n = 1
for i in range(1000):
n *= i
self.namespace.returncode = SuccessfulOperation()
except Exception as exception:
self.namespace.returncode = exception
class GreatScheduler(threading.Thread):
def __init__(self, preemption_space, arrow_list, n_thread, nmax_cycles):
threading.Thread.__init__(self)
self.preemption_space = preemption_space
self.arrow_list = arrow_list
self.n_thread = n_thread
self.cur_id = -1
self.n_cycle = -1
self.nmax_cycles = nmax_cycles
self.stopped = False
def run(self):
thread_list = []
shared_memory = multiprocessing.Manager()
while not self.stopped and self.n_cycle != self.nmax_cycles:
self.n_cycle += 1
#print("--- Mesures et écoute ---")
#print("Espace global de préemption :\t {}".format(self.preemption_space))
#print("Liste des flèches :\t {}".format(self.arrow_list))
## Tirer une flêche au hasard
n = int(random.random()*len(self.arrow_list))
elected_arrow = self.arrow_list[n]
## Vérification de l'espace global de préemption autour de la flêche
if not(len(thread_list) >= self.n_thread) and self.preemption_space[elected_arrow[0]]:
print("Espace local libre à {}".format(elected_arrow[0]))
self.preemption_space[elected_arrow[0]] = False
## Création du thread local
self.cur_id += 1
thread_list.append(
LocalThread(self.cur_id,
shared_memory,
elected_arrow[0],
elected_arrow[1],
TRANSITIONS_TREE
)
)
## Lancer le thread local
thread_list[-1].start()
for thread in thread_list:
if not thread.is_alive():
if not thread.namespace.returncode.__class__ is SuccessfulOperation:
if thread.namespace.returncode.__class__ is FailedOperation:
print("A optimiser : créer une règle")
else:
raise thread.namespace.returncode
print("Thread local n°{} est terminé".format(thread.id))
self.preemption_space[thread.address] = True
thread_list.pop(thread_list.index(thread))
## Sortie du scheduler
print("Attente de la fin des thread locaux")
for thread in thread_list:
thread.join()
if not thread.namespace.returncode.__class__ is SuccessfulOperation:
if thread.namespace.returncode.__class__ is FailedOperation:
print("A optimiser : créer une règle")
else:
raise thread.namespace.returncode
print("Thread local n°{} est terminé".format(thread.id))
class ServerCLI:
def start():
stopped = False
while not stopped:
stopped = input()
return
#multiprocessing.set_start_method('fork')
scheduler = GreatScheduler(PREEMPTION_GLOBAL_SPACE, ARROW_LIST, MAX_THREAD, MAX_CYCLES)
scheduler.start()
ServerCLI.start()
scheduler.stopped = True