Multiprocessing!

This commit is contained in:
Jean Sirmai 2021-04-09 18:01:29 +02:00
parent 0153fc300c
commit bfb625d2c0
Signed by untrusted user who does not match committer: jean
GPG Key ID: FB3115C340E057E3
5 changed files with 8277 additions and 0 deletions

23
src/Makefile Normal file
View File

@ -0,0 +1,23 @@
.PHONY: all scheduler clean test
CCOPTS=-pthread -fPIC -fwrapv -Wall -fno-strict-aliasing
CCFLAGS=-I /usr/include/python3.9
LDFLAGS=-lpython3.9 -lpthread -lm -lutil
scheduler: scheduler.c
gcc $(CCOPTS) $(CCFLAGS) -o scheduler scheduler.c $(LDFLAGS)
scheduler.c: scheduler.pyx
cython3 scheduler.pyx --embed
clean:
rm -rf scheduler.c scheduler
rm -rf test.c test
all: scheduler
test: test.c
gcc $(CCOPTS) $(CCFLAGS) -o test test.c $(LDFLAGS)
test.c: test.pyx
cython3 test.pyx --embed

BIN
src/scheduler Executable file

Binary file not shown.

8100
src/scheduler.c Normal file

File diff suppressed because it is too large Load Diff

136
src/scheduler.pyx Normal file
View File

@ -0,0 +1,136 @@
import multiprocessing, threading
import time
import random
ARROW_NUMBER = 150
MAX_CYCLES = 20000
SPACE_SIZE = 10000
PREEMPTION_GLOBAL_SPACE = [True] * SPACE_SIZE
ARROW_LIST = [(random.randint(0,SPACE_SIZE - 1),0) for x in range(ARROW_NUMBER)]
TRANSITIONS_TREE = None
MAX_THREAD = 300
class CurrentlyComputing(Exception):
pass
class SuccessfulOperation(Exception):
pass
class FailedOperation(Exception):
pass
class LocalThread(multiprocessing.Process):
def __init__(self, id, shared_memory, address, orientation, transitions):
multiprocessing.Process.__init__(self)
self.id = id
self.address = address
self.orientation = orientation
self.transitions = transitions
self.namespace = shared_memory.Namespace()
self.namespace.returncode = CurrentlyComputing()
def run(self):
try:
# Actual code
print("Thread local n°{} parle depuis {} !".format(self.id, self.address))
n = 1
for i in range(1000):
n *= i
self.namespace.returncode = SuccessfulOperation()
except Exception as exception:
self.namespace.returncode = exception
class GreatScheduler(threading.Thread):
def __init__(self, preemption_space, arrow_list, n_thread, nmax_cycles):
threading.Thread.__init__(self)
self.preemption_space = preemption_space
self.arrow_list = arrow_list
self.n_thread = n_thread
self.cur_id = -1
self.n_cycle = -1
self.nmax_cycles = nmax_cycles
self.stopped = False
def run(self):
thread_list = []
shared_memory = multiprocessing.Manager()
while not self.stopped and self.n_cycle != self.nmax_cycles:
self.n_cycle += 1
#print("--- Mesures et écoute ---")
#print("Espace global de préemption :\t {}".format(self.preemption_space))
#print("Liste des flèches :\t {}".format(self.arrow_list))
## Tirer une flêche au hasard
n = int(random.random()*len(self.arrow_list))
elected_arrow = self.arrow_list[n]
## Vérification de l'espace global de préemption autour de la flêche
if not(len(thread_list) >= self.n_thread) and self.preemption_space[elected_arrow[0]]:
print("Espace local libre à {}".format(elected_arrow[0]))
self.preemption_space[elected_arrow[0]] = False
## Création du thread local
self.cur_id += 1
thread_list.append(
LocalThread(self.cur_id,
shared_memory,
elected_arrow[0],
elected_arrow[1],
TRANSITIONS_TREE
)
)
## Lancer le thread local
thread_list[-1].start()
for thread in thread_list:
if not thread.is_alive():
if not thread.namespace.returncode.__class__ is SuccessfulOperation:
if thread.namespace.returncode.__class__ is FailedOperation:
print("A optimiser : créer une règle")
else:
raise thread.namespace.returncode
print("Thread local n°{} est terminé".format(thread.id))
self.preemption_space[thread.address] = True
thread_list.pop(thread_list.index(thread))
## Sortie du scheduler
print("Attente de la fin des thread locaux")
for thread in thread_list:
thread.join()
if not thread.namespace.returncode.__class__ is SuccessfulOperation:
if thread.namespace.returncode.__class__ is FailedOperation:
print("A optimiser : créer une règle")
else:
raise thread.namespace.returncode
print("Thread local n°{} est terminé".format(thread.id))
class ServerCLI:
def start():
stopped = False
while not stopped:
stopped = input()
return
#multiprocessing.set_start_method('fork')
scheduler = GreatScheduler(PREEMPTION_GLOBAL_SPACE, ARROW_LIST, MAX_THREAD, MAX_CYCLES)
scheduler.start()
ServerCLI.start()
scheduler.stopped = True

18
src/test.pyx Normal file
View File

@ -0,0 +1,18 @@
from threading import Thread
import time
def countdown():
n = 50
while n > 0:
n -= 1
COUNT = 10000000
with nogil:
t1 = Thread(target=countdown)
t2 = Thread(target=countdown)
start = time.time()
t1.start();t2.start()
t1.join();t2.join()
end = time.time()
print end-start