gestion/controle_adhesion/main.py

338 lines
13 KiB
Python
Executable File

#!/usr/bin/python3
# Libre en Communs's adhesion control program
# Copyright (C) 2022 Libre en Communs
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os, requests, json, datetime, shutil, quopri, subprocess, base64, os
import time, random, string, dateutil.parser, re
from typing import List
from requests.auth import HTTPBasicAuth
from requests.auth import HTTPDigestAuth
VERSION="1.0.0"
GESTION_SECRET_FILE="/home/secretaire/.secret/gestion_api_password"
GIT_SECRET_FILE="/home/secretaire/.secret/git_api_password"
WORKDIR="/srv/validation_pre_adhesion.d"
WELCOME_MAIL="mail_bienvenue.txt"
WELCOME_MAIL_HEADERS="mail_bienvenue_header.txt"
SUMMARY_MAIL="mail_resume.txt"
SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location
BUF=[]
GITEA_URL = "https://forge.a-lec.org"
# gestion_read("SELECT * FROM services_users su \
# INNER JOIN membres m ON m.id = su.id_user \
# INNER JOIN services_fees sf ON sf.id = su.id_fee \
# LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id \
# LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction \
# WHERE m.id = 3 AND l.id_account = 481;")
def gestion_get_secret():
with open(GESTION_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def git_get_secret():
with open(GIT_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def get_file_content(filename):
with open(filename) as sfile:
return sfile.readlines()
def get_file_content_all(filename):
with open(filename) as sfile:
return sfile.read()
def set_file_content(filename, lines):
with open(filename, "x") as sfile:
return sfile.writelines(lines)
def gestion_read(req):
response = requests.post('https://gestion.a-lec.org/api/sql/',
auth = HTTPBasicAuth('api666', gestion_get_secret()),
data = req)
return response.json()
def forgejo_post_issue(owner, repo, title, body, labels):
import_response: requests.Response = requests.post('https://forge.a-lec.org/api/v1/repos/{}/{}/issues?access_token={}'.format(owner, repo, git_get_secret()), data={
"body":body,
"labels":labels,
"title":title
})
return import_response
def gestion_adduser(req):
response = requests.put('https://gestion.a-lec.org/api/user/import',
auth = HTTPBasicAuth('api666', gestion_get_secret()),
data = req)
return response.content
def setup_workdir():
if not os.path.isdir(WORKDIR):
os.mkdir(WORKDIR)
if not "nouveau" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/nouveau")
if not "attente" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/attente")
if not "valide" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/valide")
if not "refuse" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/refuse")
def sendmail(headers, data):
print("***\nHEADERS: {}\n***\n".format(headers))
msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def new_registrations_worker():
if (os.listdir(WORKDIR+"/nouveau") == []):
return
global BUF
BUF.append("*** Nouvelles adhésions ***")
BUF.append("{} nouvelle(s) adhésion(s) reçue(s)".format(
len(os.listdir(
WORKDIR+"/nouveau"))))
# Parse each new registration
for registration in os.listdir(WORKDIR+"/nouveau"):
registration = WORKDIR+"/nouveau/"+registration
if not os.path.isfile(registration):
BUF.append("{} n'est pas un fichier".format(registration))
continue
numcode, content = get_file_content(registration)[0].split("|")
reg_name, reg_firstname, reg_addr, reg_postalcode, reg_city, reg_email,\
reg_attribmail, reg_xmpp, reg_tarif, reg_payment, reg_redir \
= content.split(";")
# Write summary
BUF.append("\n* {}\n".format(numcode))
BUF.append("Nom: {} {}\nCode postal: {}\n".format(reg_name,
reg_firstname, reg_postalcode))
# Check if exists
request = "SELECT numero FROM membres \
WHERE email = '{}' and nom = '{} {}'".format(
reg_attribmail,
reg_name,
reg_firstname)
try:
if len(gestion_read(request)['results']) > 0:
BUF.append("Utilisateur déjà existant !")
continue
except:
BUF.append("Erreur inconnue d'enregistrement : {}".format(gestion_read(request)))
continue
# Create this new member
request = ('"numero","date_adhesion","statut_juridique","siren_rna",' \
+ '"nom","email","mail_redirs","adresse","code_postal","ville","pays",'\
+ '"telephone","notes","Catégorie membre"' + "\n" \
+ '"{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}",'
+ '"{}"').format(
"",
datetime.datetime.now().strftime("%d/%m/%Y"),
"Personne physique",
"",
"{} {}".format(reg_name, reg_firstname),
reg_attribmail,
reg_email,
reg_addr,
reg_postalcode,
reg_city,
"FR",
"",
"Adhesion {}, {} par {}".format(numcode, reg_tarif, reg_payment),
"")
answer = gestion_adduser(request)
if len(answer) > 0:
BUF.append("Erreur : {}".format(answer))
# Check if exists now
request = "SELECT numero FROM membres \
WHERE email = '{}' and nom = '{} {}'".format(
reg_attribmail,
reg_name,
reg_firstname)
try:
if len(gestion_read(request)['results']) == 0:
BUF.append("Inscription échouée pour une raison inconnue...")
continue
except:
BUF.append("Erreur inconnue : {}".format(gestion_read(request)))
continue
# Register new member to wait for secretary
numero = gestion_read(request)['results'][-1]['numero']
try:
os.remove(WORKDIR+"/attente/"+numcode)
except:
pass
set_file_content(
WORKDIR+"/attente/"+numcode,
["{};{};{};{}".format(numero, reg_email, reg_redir, reg_xmpp)])
# Cleanup
os.remove(registration)
def validate_registrations_worker():
if (os.listdir(WORKDIR+"/attente") == []):
return
global BUF
PREBUF = []
PREBUF.append("\n*** Adhésions en validation ***")
PREBUF.append("{} nouvelle(s) adhésion(s) reçue(s)".format(
len(os.listdir(
WORKDIR+"/attente"))))
if len(BUF) > 0:
BUF = BUF + PREBUF ; PREBUF = [] # flush
# Parse each new registration
for registration in os.listdir(WORKDIR+"/attente"):
numcode = registration
registration = WORKDIR+"/attente/"+registration
if not os.path.isfile(registration):
BUF = BUF + PREBUF ; PREBUF = [] # flush
BUF.append("{} n'est pas un fichier".format(registration))
continue
numero, email, is_redir, is_xmpp = \
get_file_content(registration)[0].split(";")
# Write summary
PREBUF.append("\n* {}\n".format(numcode))
# Gather information
request = "SELECT * FROM membres \
WHERE numero = '{}'".format(numero)
# Check status
try:
if len(gestion_read(request)['results']) == 0:
BUF = BUF + PREBUF ; PREBUF = [] # flush
BUF.append("Validation échouée : membre refusé.")
# Register new member to be refused
try:
os.remove(registration.replace("attente", "refuse"))
except:
pass
os.rename(registration, registration.replace("attente", "refuse"))
continue
except:
BUF.append("Erreur inconnue : {}".format(gestion_read(request)))
continue
# Gather information
member = gestion_read(request)['results'][-1]
PREBUF.append(("Numéro de membre: {}\nNom: {}\nCode postal: {}").format(
numero,
member['nom'],
member['code_postal']))
PREBUF.append(("Catégorie: {}").format(member['id_category']))
# Check category (validated or not !)
if member['id_category'] == 2:
continue
elif member['id_category'] == 10:
continue
BUF = BUF + PREBUF ; PREBUF = [] # flush
BUF.append("=> MEMBRE VALIDE PAR SECRETARIAT <=")
# Launch git tickets
title = "Création d'un compte courriel membre n°{}".format(numero)
body = "Bonjour,\n Une demande de création de compte de" \
+ "courriel ou une redirection a été émise.\n\n" \
+ "Compte de courriel actuel : {} \n".format(
email) \
+ "Compte de courriel désiré : {} \n".format(
member['email']) \
+ "Redirection ('on' si oui, vide si non) : {} \n".format(
is_redir)
labels = [970]
ticket_response = forgejo_post_issue("cominfra", "mail", title, body, labels)
if not "id" in str(ticket_response.json()):
BUF.append("Validation échouée : ticket git MAIL en erreur : {}".format(
ticket_response
))
continue
else:
BUF.append("Ticket courriel ouvert.")
if is_xmpp == "on":
title = "Création d'un compte XMPP membre n°{}".format(numero)
body = "Bonjour,\n Une demande de création de compte XMPP " \
+ "a été émise.\n\n" \
+ "Compte de courriel actuel : {} \n".format(
email) \
+ "Compte de XMPP désiré : {} \n".format(
member['email'])
labels = [970]
ticket_response = forgejo_post_issue("cominfra", "xmpp", title, body, labels)
if not "id" in str(ticket_response.json()):
BUF.append("Validation échouée : ticket git XMPP en erreur : {}".format(
ticket_response
))
continue
else:
BUF.append("Ticket XMPP ouvert.")
# Launch welcoming mail
mailtextheaders = get_file_content_all(WELCOME_MAIL_HEADERS)
mailtext = get_file_content_all(WELCOME_MAIL)
mailtextheaders = mailtextheaders.replace("COURRIEL_INSCRIPTION", email)
mailtext = mailtext.replace("NUMERO_MEMBRE", numero)
mailtext = mailtext.replace("COURRIEL_MEMBRE", member['email'])
mailtext = mailtext.replace("COURRIEL_INSCRIPTION", email)
mailtext = mailtext.replace("NOM_MEMBRE", member['nom'])
sendmail(mailtextheaders, mailtext)
# Register new member to be validated
try:
os.remove(registration.replace("attente", "valide"))
except:
pass
os.rename(registration, registration.replace("attente", "valide"))
def main():
setup_workdir()
new_registrations_worker()
validate_registrations_worker()
# End of work
# Launch summary mail
mailheaders = get_file_content_all(SUMMARY_MAIL) + "\n"
mailtext = ""
is_sendable = False;
for line in BUF:
mailtext += line + "\n"
is_sendable = True;
print(line)
if is_sendable:
sendmail(mailheaders, mailtext)
## Bootstrap
if __name__ == '__main__':
main()