344 lines
13 KiB
Python
Executable File
344 lines
13 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
# Libre en Communs's adhesion control program
|
|
# Copyright (C) 2022 Libre en Communs
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import os, requests, json, datetime, shutil, quopri, subprocess, base64, os
|
|
import time, random, string, dateutil.parser, re, pygitea
|
|
from typing import List
|
|
from requests.auth import HTTPBasicAuth
|
|
from requests.auth import HTTPDigestAuth
|
|
|
|
|
|
VERSION="1.0.0"
|
|
GESTION_SECRET_FILE="/home/secretaire/.secret/gestion_api_password"
|
|
GIT_SECRET_FILE="/home/secretaire/.secret/git_api_password"
|
|
WORKDIR="/srv/validation_pre_adhesion.d"
|
|
WELCOME_MAIL="mail_bienvenue.txt"
|
|
WELCOME_MAIL_HEADERS="mail_bienvenue_header.txt"
|
|
SUMMARY_MAIL="mail_resume.txt"
|
|
SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location
|
|
BUF=[]
|
|
GITEA_URL = "https://forge.a-lec.org"
|
|
|
|
# gestion_read("SELECT * FROM services_users su \
|
|
# INNER JOIN membres m ON m.id = su.id_user \
|
|
# INNER JOIN services_fees sf ON sf.id = su.id_fee \
|
|
# LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id \
|
|
# LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction \
|
|
# WHERE m.id = 3 AND l.id_account = 481;")
|
|
|
|
def gestion_get_secret():
|
|
with open(GESTION_SECRET_FILE) as sfile:
|
|
return sfile.readline().replace("\n", "")
|
|
|
|
def git_get_secret():
|
|
with open(GIT_SECRET_FILE) as sfile:
|
|
return sfile.readline().replace("\n", "")
|
|
|
|
def get_file_content(filename):
|
|
with open(filename) as sfile:
|
|
return sfile.readlines()
|
|
|
|
def get_file_content_all(filename):
|
|
with open(filename) as sfile:
|
|
return sfile.read()
|
|
|
|
def set_file_content(filename, lines):
|
|
with open(filename, "x") as sfile:
|
|
return sfile.writelines(lines)
|
|
|
|
def gestion_read(req):
|
|
response = requests.post('https://gestion.a-lec.org/api/sql/',
|
|
auth = HTTPBasicAuth('api666', gestion_get_secret()),
|
|
data = req)
|
|
return response.json()
|
|
|
|
def forgejo_post_issue(owner, repo, title, body, labels):
|
|
gt = pygitea.API(GITEA_URL, token=git_get_secret())
|
|
gt_version = gt.get('/version').json()
|
|
print("Connected to Gitea, version: " + str(gt_version['version']))
|
|
import_response: requests.Response = gt.post("/repos/" + owner + "/" + repo + "/issues", json={
|
|
"body": body,
|
|
"labels": labels,
|
|
"title": title,
|
|
})
|
|
if not(import_response.ok):
|
|
print("Erreur à l'envoi du ticket {} à {}".format(title, repo))
|
|
raise(Exception)
|
|
return import_response.ok
|
|
|
|
def gestion_adduser(req):
|
|
response = requests.put('https://gestion.a-lec.org/api/user/import',
|
|
auth = HTTPBasicAuth('api666', gestion_get_secret()),
|
|
data = req)
|
|
return response.content
|
|
|
|
def setup_workdir():
|
|
if not os.path.isdir(WORKDIR):
|
|
os.mkdir(WORKDIR)
|
|
if not "nouveau" in os.listdir(WORKDIR):
|
|
os.mkdir(WORKDIR+"/nouveau")
|
|
if not "attente" in os.listdir(WORKDIR):
|
|
os.mkdir(WORKDIR+"/attente")
|
|
if not "valide" in os.listdir(WORKDIR):
|
|
os.mkdir(WORKDIR+"/valide")
|
|
if not "refuse" in os.listdir(WORKDIR):
|
|
os.mkdir(WORKDIR+"/refuse")
|
|
|
|
def sendmail(headers, data):
|
|
print("***\nHEADERS: {}\n***\n".format(headers))
|
|
msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
|
|
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
|
|
|
|
def new_registrations_worker():
|
|
if (os.listdir(WORKDIR+"/nouveau") == []):
|
|
return
|
|
global BUF
|
|
BUF.append("*** Nouvelles adhésions ***")
|
|
BUF.append("{} nouvelle(s) adhésion(s) reçue(s)".format(
|
|
len(os.listdir(
|
|
WORKDIR+"/nouveau"))))
|
|
|
|
# Parse each new registration
|
|
for registration in os.listdir(WORKDIR+"/nouveau"):
|
|
registration = WORKDIR+"/nouveau/"+registration
|
|
if not os.path.isfile(registration):
|
|
BUF.append("{} n'est pas un fichier".format(registration))
|
|
continue
|
|
numcode, content = get_file_content(registration)[0].split("|")
|
|
reg_name, reg_firstname, reg_addr, reg_postalcode, reg_city, reg_email,\
|
|
reg_attribmail, reg_xmpp, reg_tarif, reg_payment, reg_redir \
|
|
= content.split(";")
|
|
|
|
# Write summary
|
|
BUF.append("\n* {}\n".format(numcode))
|
|
BUF.append("Nom: {} {}\nCode postal: {}\n".format(reg_name,
|
|
reg_firstname, reg_postalcode))
|
|
|
|
# Check if exists
|
|
request = "SELECT numero FROM membres \
|
|
WHERE email = '{}' and nom = '{} {}'".format(
|
|
reg_attribmail,
|
|
reg_name,
|
|
reg_firstname)
|
|
try:
|
|
if len(gestion_read(request)['results']) > 0:
|
|
BUF.append("Utilisateur déjà existant !")
|
|
continue
|
|
except:
|
|
BUF.append("Erreur inconnue d'enregistrement : {}".format(gestion_read(request)))
|
|
continue
|
|
|
|
# Create this new member
|
|
request = ('"numero","date_adhesion","statut_juridique","siren_rna",' \
|
|
+ '"nom","email","mail_redirs","adresse","code_postal","ville","pays",'\
|
|
+ '"telephone","notes","Catégorie membre"' + "\n" \
|
|
+ '"{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}",'
|
|
+ '"{}"').format(
|
|
"",
|
|
datetime.datetime.now().strftime("%d/%m/%Y"),
|
|
"Personne physique",
|
|
"",
|
|
"{} {}".format(reg_name, reg_firstname),
|
|
reg_attribmail,
|
|
reg_email,
|
|
reg_addr,
|
|
reg_postalcode,
|
|
reg_city,
|
|
"FR",
|
|
"",
|
|
"Adhesion {}, {} par {}".format(numcode, reg_tarif, reg_payment),
|
|
"")
|
|
answer = gestion_adduser(request)
|
|
if len(answer) > 0:
|
|
BUF.append("Erreur : {}".format(answer))
|
|
|
|
# Check if exists now
|
|
request = "SELECT numero FROM membres \
|
|
WHERE email = '{}' and nom = '{} {}'".format(
|
|
reg_attribmail,
|
|
reg_name,
|
|
reg_firstname)
|
|
try:
|
|
if len(gestion_read(request)['results']) == 0:
|
|
BUF.append("Inscription échouée pour une raison inconnue...")
|
|
continue
|
|
except:
|
|
BUF.append("Erreur inconnue : {}".format(gestion_read(request)))
|
|
continue
|
|
|
|
# Register new member to wait for secretary
|
|
numero = gestion_read(request)['results'][-1]['numero']
|
|
try:
|
|
os.remove(WORKDIR+"/attente/"+numcode)
|
|
except:
|
|
pass
|
|
set_file_content(
|
|
WORKDIR+"/attente/"+numcode,
|
|
["{};{};{};{}".format(numero, reg_email, reg_redir, reg_xmpp)])
|
|
|
|
# Cleanup
|
|
os.remove(registration)
|
|
|
|
|
|
def validate_registrations_worker():
|
|
if (os.listdir(WORKDIR+"/attente") == []):
|
|
return
|
|
global BUF
|
|
PREBUF = []
|
|
PREBUF.append("\n*** Adhésions en validation ***")
|
|
PREBUF.append("{} nouvelle(s) adhésion(s) reçue(s)".format(
|
|
len(os.listdir(
|
|
WORKDIR+"/attente"))))
|
|
if len(BUF) > 0:
|
|
BUF = BUF + PREBUF ; PREBUF = [] # flush
|
|
|
|
|
|
# Parse each new registration
|
|
for registration in os.listdir(WORKDIR+"/attente"):
|
|
numcode = registration
|
|
registration = WORKDIR+"/attente/"+registration
|
|
if not os.path.isfile(registration):
|
|
BUF = BUF + PREBUF ; PREBUF = [] # flush
|
|
BUF.append("{} n'est pas un fichier".format(registration))
|
|
continue
|
|
|
|
numero, email, is_redir, is_xmpp = \
|
|
get_file_content(registration)[0].split(";")
|
|
|
|
# Write summary
|
|
PREBUF.append("\n* {}\n".format(numcode))
|
|
|
|
# Gather information
|
|
request = "SELECT * FROM membres \
|
|
WHERE numero = '{}'".format(numero)
|
|
|
|
# Check status
|
|
try:
|
|
if len(gestion_read(request)['results']) == 0:
|
|
BUF = BUF + PREBUF ; PREBUF = [] # flush
|
|
BUF.append("Validation échouée : membre refusé.")
|
|
|
|
# Register new member to be refused
|
|
try:
|
|
os.remove(registration.replace("attente", "refuse"))
|
|
except:
|
|
pass
|
|
os.rename(registration, registration.replace("attente", "refuse"))
|
|
continue
|
|
except:
|
|
BUF.append("Erreur inconnue : {}".format(gestion_read(request)))
|
|
continue
|
|
|
|
# Gather information
|
|
member = gestion_read(request)['results'][-1]
|
|
|
|
PREBUF.append(("Numéro de membre: {}\nNom: {}\nCode postal: {}").format(
|
|
numero,
|
|
member['nom'],
|
|
member['code_postal']))
|
|
PREBUF.append(("Catégorie: {}").format(member['id_category']))
|
|
|
|
# Check category (validated or not !)
|
|
if member['id_category'] == 2:
|
|
continue
|
|
elif member['id_category'] == 10:
|
|
continue
|
|
|
|
BUF = BUF + PREBUF ; PREBUF = [] # flush
|
|
BUF.append("=> MEMBRE VALIDE PAR SECRETARIAT <=")
|
|
|
|
# Launch git tickets
|
|
title = "Création d'un compte courriel membre n°{}.".format(numero)
|
|
body = "Bonjour,\n Une demande de création de compte de" \
|
|
+ "courriel ou une redirection a été émise.\n\n" \
|
|
+ "Compte de courriel actuel : {} \n".format(
|
|
email) \
|
|
+ "Compte de courriel désiré : {} \n".format(
|
|
member['email']) \
|
|
+ "Redirection ('on' si oui, vide si non) : {} \n".format(
|
|
is_redir)
|
|
labels = [970]
|
|
ticket_response = forgejo_post_issue("cominfra", "mail", title, body, labels)
|
|
if not "id" in str(ticket_response):
|
|
BUF.append("Validation échouée : ticket git MAIL en erreur : {}".format(
|
|
ticket_response
|
|
))
|
|
continue
|
|
else:
|
|
BUF.append("Ticket courriel ouvert.")
|
|
|
|
if is_xmpp == "on":
|
|
title = "Création d'un compte XMPP membre n°{}".format(numero)
|
|
body = "Bonjour,\n Une demande de création de compte XMPP " \
|
|
+ "a été émise.\n\n" \
|
|
+ "Compte de courriel actuel : {} \n".format(
|
|
email) \
|
|
+ "Compte de XMPP désiré : {} \n".format(
|
|
member['email'])
|
|
labels = [970]
|
|
ticket_response = forgejo_post_issue("cominfra", "mail", title, body, labels)
|
|
if not "id" in str(ticket_response):
|
|
BUF.append("Validation échouée : ticket git XMPP en erreur : {}".format(
|
|
ticket_response
|
|
))
|
|
continue
|
|
else:
|
|
BUF.append("Ticket XMPP ouvert.")
|
|
|
|
# Launch welcoming mail
|
|
mailtextheaders = get_file_content_all(WELCOME_MAIL_HEADERS)
|
|
mailtext = get_file_content_all(WELCOME_MAIL)
|
|
|
|
mailtextheaders = mailtextheaders.replace("COURRIEL_INSCRIPTION", email)
|
|
mailtext = mailtext.replace("NUMERO_MEMBRE", numero)
|
|
mailtext = mailtext.replace("COURRIEL_MEMBRE", member['email'])
|
|
mailtext = mailtext.replace("COURRIEL_INSCRIPTION", email)
|
|
mailtext = mailtext.replace("NOM_MEMBRE", member['nom'])
|
|
|
|
sendmail(mailtextheaders, mailtext)
|
|
|
|
# Register new member to be validated
|
|
try:
|
|
os.remove(registration.replace("attente", "valide"))
|
|
except:
|
|
pass
|
|
os.rename(registration, registration.replace("attente", "valide"))
|
|
|
|
def main():
|
|
setup_workdir()
|
|
new_registrations_worker()
|
|
validate_registrations_worker()
|
|
|
|
# End of work
|
|
# Launch summary mail
|
|
mailheaders = get_file_content_all(SUMMARY_MAIL) + "\n"
|
|
mailtext = ""
|
|
|
|
is_sendable = False;
|
|
for line in BUF:
|
|
mailtext += line + "\n"
|
|
is_sendable = True;
|
|
print(line)
|
|
|
|
if is_sendable:
|
|
sendmail(mailheaders, mailtext)
|
|
|
|
## Bootstrap
|
|
if __name__ == '__main__':
|
|
main()
|