gestion/controle_adhesion/main.py

372 lines
14 KiB
Python
Executable File

#!/usr/bin/python3
# Libre en Communs's adhesion control program
# Copyright (C) 2022 Libre en Communs
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os, requests, json, datetime, shutil, quopri, subprocess, base64, os
import time, random, string, dateutil.parser, re
from typing import List
from requests.auth import HTTPBasicAuth
from requests.auth import HTTPDigestAuth
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
VERSION="1.0.0"
GESTION_SECRET_FILE="/home/secretaire/.secret/gestion_api_password"
GIT_SECRET_FILE="/home/secretaire/.secret/git_api_password"
WORKDIR="/srv/validation_pre_adhesion.d"
WELCOME_MAIL="mail_bienvenue.txt"
WELCOME_MAIL_HEADERS="mail_bienvenue_header.txt"
SUMMARY_MAIL="mail_resume.txt"
SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location
BUF=[]
GITEA_URL = "https://forge.a-lec.org"
# Update these with your SMTP server details
SMTP_SERVER = 'mail.a-lec.org'
SMTP_PORT = 587
SMTP_USER = 'secretaire@a-lec.org'
SMTP_SECRET_FILE = "/home/secretaire/.secret/smtp_api_password"
# gestion_read("SELECT * FROM services_users su \
# INNER JOIN membres m ON m.id = su.id_user \
# INNER JOIN services_fees sf ON sf.id = su.id_fee \
# LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id \
# LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction \
# WHERE m.id = 3 AND l.id_account = 481;")
def gestion_get_secret():
with open(GESTION_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def smtp_get_secret():
with open(SMTP_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def git_get_secret():
with open(GIT_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def get_file_content(filename):
with open(filename) as sfile:
return sfile.readlines()
def get_file_content_all(filename):
with open(filename) as sfile:
return sfile.read()
def set_file_content(filename, lines):
with open(filename, "x") as sfile:
return sfile.writelines(lines)
def gestion_read(req):
response = requests.post('https://gestion.a-lec.org/api/sql/',
auth = HTTPBasicAuth('api666', gestion_get_secret()),
data = req)
return response.json()
def forgejo_post_issue(owner, repo, title, body, labels):
import_response: requests.Response = requests.post('https://forge.a-lec.org/api/v1/repos/{}/{}/issues?access_token={}'.format(owner, repo, git_get_secret()), data={
"body":body,
"labels":labels,
"title":title
})
return import_response
def gestion_adduser(req):
response = requests.put('https://gestion.a-lec.org/api/user/import',
auth = HTTPBasicAuth('api666', gestion_get_secret()),
data = req)
return response.content
def setup_workdir():
if not os.path.isdir(WORKDIR):
os.mkdir(WORKDIR)
if not "nouveau" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/nouveau")
if not "attente" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/attente")
if not "valide" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/valide")
if not "refuse" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/refuse")
# def sendmail(headers, data):
# print("***\nHEADERS: {}\n***\n".format(headers))
# msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
# subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def sendmail(headers, data):
print("***\nHEADERS: {}\n***\n".format(headers))
# Prepare the email
msg = MIMEMultipart()
for header in headers.split("\n"):
if ": " in header:
key, value = header.split(": ", 1)
msg[key] = value
msg.attach(MIMEText(data, 'plain'))
# Connect to the SMTP server and send the email
try:
server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
server.starttls() # Upgrade the connection to a secure TLS connection
server.login(SMTP_USER, smtp_get_secret())
server.sendmail(SMTP_USER, msg['To'], msg.as_string())
server.quit()
#print("Email sent successfully")
except Exception as e:
print(f"Failed to send email: {e}")
def new_registrations_worker():
if (os.listdir(WORKDIR+"/nouveau") == []):
return
global BUF
BUF.append("*** Nouvelles adhésions ***")
BUF.append("{} nouvelle(s) adhésion(s) reçue(s)".format(
len(os.listdir(
WORKDIR+"/nouveau"))))
# Parse each new registration
for registration in os.listdir(WORKDIR+"/nouveau"):
registration = WORKDIR+"/nouveau/"+registration
if not os.path.isfile(registration):
BUF.append("{} n'est pas un fichier".format(registration))
continue
numcode, content = get_file_content(registration)[0].split("|")
reg_name, reg_firstname, reg_addr, reg_postalcode, reg_city, reg_email,\
reg_attribmail, reg_xmpp, reg_tarif, reg_payment, reg_redir \
= content.split(";")
# Write summary
BUF.append("\n* {}\n".format(numcode))
BUF.append("Nom: {} {}\nCode postal: {}\n".format(reg_name,
reg_firstname, reg_postalcode))
# Check if exists
request = "SELECT numero FROM users \
WHERE email = '{}' and nom = '{} {}'".format(
reg_attribmail,
reg_name,
reg_firstname)
try:
if len(gestion_read(request)['results']) > 0:
BUF.append("Utilisateur déjà existant !")
continue
except:
BUF.append("Erreur inconnue d'enregistrement : {}".format(gestion_read(request)))
continue
# Create this new member
request = ('"numero","date_adhesion","statut_juridique","siren_rna",' \
+ '"nom","email","mail_redirs","adresse","code_postal","ville","pays",'\
+ '"telephone","notes","Catégorie membre"' + "\n" \
+ '"{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}",'
+ '"{}"').format(
"",
datetime.datetime.now().strftime("%d/%m/%Y"),
"Personne physique",
"",
"{} {}".format(reg_name, reg_firstname),
reg_attribmail,
reg_email,
reg_addr,
reg_postalcode,
reg_city,
"FR",
"",
"Adhesion {}, {} par {}".format(numcode, reg_tarif, reg_payment),
"")
answer = gestion_adduser(request)
if len(answer) > 0:
BUF.append("Erreur : {}".format(answer))
# Check if exists now
request = "SELECT numero FROM users \
WHERE email = '{}' and nom = '{} {}'".format(
reg_attribmail,
reg_name,
reg_firstname)
try:
if len(gestion_read(request)['results']) == 0:
BUF.append("Inscription échouée pour une raison inconnue...")
continue
except:
BUF.append("Erreur inconnue : {}".format(gestion_read(request)))
continue
# Register new member to wait for secretary
numero = gestion_read(request)['results'][-1]['numero']
try:
os.remove(WORKDIR+"/attente/"+numcode)
except:
pass
set_file_content(
WORKDIR+"/attente/"+numcode,
["{};{};{};{}".format(numero, reg_email, reg_redir, reg_xmpp)])
# Cleanup
os.remove(registration)
def validate_registrations_worker():
if (os.listdir(WORKDIR+"/attente") == []):
return
global BUF
PREBUF = []
PREBUF.append("\n*** Adhésions en validation ***")
PREBUF.append("{} nouvelle(s) adhésion(s) reçue(s)".format(
len(os.listdir(
WORKDIR+"/attente"))))
if len(BUF) > 0:
BUF = BUF + PREBUF ; PREBUF = [] # flush
# Parse each new registration
for registration in os.listdir(WORKDIR+"/attente"):
numcode = registration
registration = WORKDIR+"/attente/"+registration
if not os.path.isfile(registration):
BUF = BUF + PREBUF ; PREBUF = [] # flush
BUF.append("{} n'est pas un fichier".format(registration))
continue
numero, email, is_redir, is_xmpp = \
get_file_content(registration)[0].split(";")
# Write summary
PREBUF.append("\n* {}\n".format(numcode))
# Gather information
request = "SELECT * FROM users \
WHERE numero = '{}'".format(numero)
# Check status
try:
if len(gestion_read(request)['results']) == 0:
BUF = BUF + PREBUF ; PREBUF = [] # flush
BUF.append("Validation échouée : membre refusé.")
# Register new member to be refused
try:
os.remove(registration.replace("attente", "refuse"))
except:
pass
os.rename(registration, registration.replace("attente", "refuse"))
continue
except:
BUF.append("Erreur inconnue : {}".format(gestion_read(request)))
continue
# Gather information
member = gestion_read(request)['results'][-1]
PREBUF.append(("Numéro de membre: {}\nNom: {}\nCode postal: {}").format(
numero,
member['nom'],
member['code_postal']))
PREBUF.append(("Catégorie: {}").format(member['id_category']))
# Check category (validated or not !)
if member['id_category'] == 2:
continue
elif member['id_category'] == 10:
continue
BUF = BUF + PREBUF ; PREBUF = [] # flush
BUF.append("=> MEMBRE VALIDE PAR SECRETARIAT <=")
# Launch git tickets
title = "Création d'un compte courriel membre n°{}".format(numero)
body = "Bonjour,\n Une demande de création de compte de" \
+ "courriel ou une redirection a été émise.\n\n" \
+ "Compte de courriel actuel : {} \n".format(
email) \
+ "Compte de courriel désiré : {} \n".format(
member['email']) \
+ "Redirection ('on' si oui, vide si non) : {} \n".format(
is_redir)
labels = [970]
ticket_response = forgejo_post_issue("cominfra", "mail", title, body, labels)
if not "id" in str(ticket_response.json()):
BUF.append("Validation échouée : ticket git MAIL en erreur : {}".format(
ticket_response
))
continue
else:
BUF.append("Ticket courriel ouvert.")
if is_xmpp == "on":
title = "Création d'un compte XMPP membre n°{}".format(numero)
body = "Bonjour,\n Une demande de création de compte XMPP " \
+ "a été émise.\n\n" \
+ "Compte de courriel actuel : {} \n".format(
email) \
+ "Compte de XMPP désiré : {} \n".format(
member['email'])
labels = [970]
ticket_response = forgejo_post_issue("cominfra", "xmpp", title, body, labels)
if not "id" in str(ticket_response.json()):
BUF.append("Validation échouée : ticket git XMPP en erreur : {}".format(
ticket_response
))
continue
else:
BUF.append("Ticket XMPP ouvert.")
# Launch welcoming mail
mailtextheaders = get_file_content_all(WELCOME_MAIL_HEADERS)
mailtext = get_file_content_all(WELCOME_MAIL)
mailtextheaders = mailtextheaders.replace("COURRIEL_INSCRIPTION", email)
mailtext = mailtext.replace("NUMERO_MEMBRE", numero)
mailtext = mailtext.replace("COURRIEL_MEMBRE", member['email'])
mailtext = mailtext.replace("COURRIEL_INSCRIPTION", email)
mailtext = mailtext.replace("NOM_MEMBRE", member['nom'])
sendmail(mailtextheaders, mailtext)
# Register new member to be validated
try:
os.remove(registration.replace("attente", "valide"))
except:
pass
os.rename(registration, registration.replace("attente", "valide"))
def main():
setup_workdir()
new_registrations_worker()
validate_registrations_worker()
# End of work
# Launch summary mail
mailheaders = get_file_content_all(SUMMARY_MAIL) + "\n"
mailtext = ""
is_sendable = False;
for line in BUF:
mailtext += line + "\n"
is_sendable = True;
print(line)
if is_sendable:
sendmail(mailheaders, mailtext)
## Bootstrap
if __name__ == '__main__':
main()