#!/usr/bin/python3 # Libre en Communs's adhesion control program # Copyright (C) 2022-2024 Libre en Communs # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import os, requests, json, datetime, shutil, quopri, subprocess, base64, os import time, random, string, dateutil.parser, re from typing import List from requests.auth import HTTPBasicAuth from requests.auth import HTTPDigestAuth import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart VERSION="1.0.0" GESTION_SECRET_FILE="/home/secretaire/.secret/gestion_api_password" GIT_SECRET_FILE="/home/secretaire/.secret/git_api_password" WORKDIR="/srv/validation_pre_adhesion.d" WELCOME_MAIL="mail_bienvenue.txt" WELCOME_MAIL_HEADERS="mail_bienvenue_header.txt" SUMMARY_MAIL="mail_resume.txt" SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location BUF=[] GITEA_URL = "https://forge.a-lec.org" # Update these with your SMTP server details SMTP_SERVER = 'mail.a-lec.org' SMTP_PORT = 587 SMTP_USER = 'secretaire@a-lec.org' SMTP_SECRET_FILE = "/home/secretaire/.secret/smtp_api_password" # gestion_read("SELECT * FROM services_users su \ # INNER JOIN membres m ON m.id = su.id_user \ # INNER JOIN services_fees sf ON sf.id = su.id_fee \ # LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id \ # LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction \ # WHERE m.id = 3 AND l.id_account = 481;") def gestion_get_secret(): with open(GESTION_SECRET_FILE) as sfile: return sfile.readline().replace("\n", "") def smtp_get_secret(): with open(SMTP_SECRET_FILE) as sfile: return sfile.readline().replace("\n", "") def git_get_secret(): with open(GIT_SECRET_FILE) as sfile: return sfile.readline().replace("\n", "") def get_file_content(filename): with open(filename) as sfile: return sfile.readlines() def get_file_content_all(filename): with open(filename) as sfile: return sfile.read() def set_file_content(filename, lines): with open(filename, "x") as sfile: return sfile.writelines(lines) def gestion_read(req): response = requests.post('https://gestion.a-lec.org/api/sql/', auth = HTTPBasicAuth('api666', gestion_get_secret()), data = req) return response.json() def forgejo_post_issue(owner, repo, title, body, labels): import_response: requests.Response = requests.post('https://forge.a-lec.org/api/v1/repos/{}/{}/issues?access_token={}'.format(owner, repo, git_get_secret()), data={ "body":body, "labels":labels, "title":title }) return import_response def gestion_adduser(req): response = requests.put('https://gestion.a-lec.org/api/user/import', auth = HTTPBasicAuth('api666', gestion_get_secret()), data = req) return response.content def setup_workdir(): if not os.path.isdir(WORKDIR): os.mkdir(WORKDIR) if not "nouveau" in os.listdir(WORKDIR): os.mkdir(WORKDIR+"/nouveau") if not "attente" in os.listdir(WORKDIR): os.mkdir(WORKDIR+"/attente") if not "valide" in os.listdir(WORKDIR): os.mkdir(WORKDIR+"/valide") if not "refuse" in os.listdir(WORKDIR): os.mkdir(WORKDIR+"/refuse") # def sendmail(headers, data): # print("***\nHEADERS: {}\n***\n".format(headers)) # msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8')) # subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg) def sendmail(headers, data): print("***\nHEADERS: {}\n***\n".format(headers)) # Prepare the email msg = MIMEMultipart() for header in headers.split("\n"): if ": " in header: key, value = header.split(": ", 1) msg[key] = value msg.attach(MIMEText(data, 'plain')) # Connect to the SMTP server and send the email try: server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT) server.starttls() # Upgrade the connection to a secure TLS connection server.login(SMTP_USER, smtp_get_secret()) server.sendmail(SMTP_USER, msg['To'], msg.as_string()) server.quit() #print("Email sent successfully") except Exception as e: print(f"Failed to send email: {e}") def new_registrations_worker(): if (os.listdir(WORKDIR+"/nouveau") == []): return global BUF BUF.append("*** Nouvelles adhésions ***") BUF.append("{} nouvelle(s) adhésion(s) reçue(s)".format( len(os.listdir( WORKDIR+"/nouveau")))) # Parse each new registration for registration in os.listdir(WORKDIR+"/nouveau"): registration = WORKDIR+"/nouveau/"+registration if not os.path.isfile(registration): BUF.append("{} n'est pas un fichier".format(registration)) continue numcode, content = get_file_content(registration)[0].split("|") reg_name, reg_firstname, reg_addr, reg_postalcode, reg_city, reg_email,\ reg_attribmail, reg_xmpp, reg_tarif, reg_payment, reg_redir \ = content.split(";") # Write summary BUF.append("\n* {}\n".format(numcode)) BUF.append("Nom: {} {}\nCode postal: {}\n".format(reg_name, reg_firstname, reg_postalcode)) # Check if exists request = "SELECT numero FROM users \ WHERE email = '{}' and nom = '{} {}'".format( reg_attribmail, reg_name, reg_firstname) try: if len(gestion_read(request)['results']) > 0: BUF.append("Utilisateur déjà existant !") continue except: BUF.append("Erreur inconnue d'enregistrement : {}".format(gestion_read(request))) continue # Create this new member request = ('"numero","date_adhesion","statut_juridique","siren_rna",' \ + '"nom","email","mail_redirs","adresse","code_postal","ville","pays",'\ + '"telephone","notes","Catégorie membre"' + "\n" \ + '"{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}",' + '"{}"').format( "", datetime.datetime.now().strftime("%d/%m/%Y"), "Personne physique", "", "{} {}".format(reg_name, reg_firstname), reg_attribmail, reg_email, reg_addr, reg_postalcode, reg_city, "FR", "", "Adhesion {}, {} par {}".format(numcode, reg_tarif, reg_payment), "") answer = gestion_adduser(request) if len(answer) > 0: BUF.append("Erreur : {}".format(answer)) # Check if exists now request = "SELECT numero FROM users \ WHERE email = '{}' and nom = '{} {}'".format( reg_attribmail, reg_name, reg_firstname) try: if len(gestion_read(request)['results']) == 0: BUF.append("Inscription échouée pour une raison inconnue...") continue except: BUF.append("Erreur inconnue : {}".format(gestion_read(request))) continue # Register new member to wait for secretary numero = gestion_read(request)['results'][-1]['numero'] try: os.remove(WORKDIR+"/attente/"+numcode) except: pass set_file_content( WORKDIR+"/attente/"+numcode, ["{};{};{};{}".format(numero, reg_email, reg_redir, reg_xmpp)]) # Cleanup os.remove(registration) def validate_registrations_worker(): if (os.listdir(WORKDIR+"/attente") == []): return global BUF PREBUF = [] PREBUF.append("\n*** Adhésions en validation ***") PREBUF.append("{} nouvelle(s) adhésion(s) reçue(s)".format( len(os.listdir( WORKDIR+"/attente")))) if len(BUF) > 0: BUF = BUF + PREBUF ; PREBUF = [] # flush # Parse each new registration for registration in os.listdir(WORKDIR+"/attente"): numcode = registration registration = WORKDIR+"/attente/"+registration if not os.path.isfile(registration): BUF = BUF + PREBUF ; PREBUF = [] # flush BUF.append("{} n'est pas un fichier".format(registration)) continue numero, email, is_redir, is_xmpp = \ get_file_content(registration)[0].split(";") # Write summary PREBUF.append("\n* {}\n".format(numcode)) # Gather information request = "SELECT * FROM users \ WHERE numero = '{}'".format(numero) # Check status try: if len(gestion_read(request)['results']) == 0: BUF = BUF + PREBUF ; PREBUF = [] # flush BUF.append("Validation échouée : membre refusé.") # Register new member to be refused try: os.remove(registration.replace("attente", "refuse")) except: pass os.rename(registration, registration.replace("attente", "refuse")) continue except: BUF.append("Erreur inconnue : {}".format(gestion_read(request))) continue # Gather information member = gestion_read(request)['results'][-1] PREBUF.append(("Numéro de membre: {}\nNom: {}\nCode postal: {}").format( numero, member['nom'], member['code_postal'])) PREBUF.append(("Catégorie: {}").format(member['id_category'])) # Check category (validated or not !) if member['id_category'] == 2: continue elif member['id_category'] == 10: continue BUF = BUF + PREBUF ; PREBUF = [] # flush BUF.append("=> MEMBRE VALIDE PAR SECRETARIAT <=") # Launch git tickets title = "Création d'un compte courriel membre n°{}".format(numero) body = "Bonjour,\n Une demande de création de compte de" \ + "courriel ou une redirection a été émise.\n\n" \ + "Compte de courriel actuel : {} \n".format( email) \ + "Compte de courriel désiré : {} \n".format( member['email']) \ + "Redirection ('on' si oui, vide si non) : {} \n".format( is_redir) labels = [970] ticket_response = forgejo_post_issue("cominfra", "mail", title, body, labels) if not "id" in str(ticket_response.json()): BUF.append("Validation échouée : ticket git MAIL en erreur : {}".format( ticket_response )) continue else: BUF.append("Ticket courriel ouvert.") if is_xmpp == "on": title = "Création d'un compte XMPP membre n°{}".format(numero) body = "Bonjour,\n Une demande de création de compte XMPP " \ + "a été émise.\n\n" \ + "Compte de courriel actuel : {} \n".format( email) \ + "Compte de XMPP désiré : {} \n".format( member['email']) labels = [970] ticket_response = forgejo_post_issue("cominfra", "xmpp", title, body, labels) if not "id" in str(ticket_response.json()): BUF.append("Validation échouée : ticket git XMPP en erreur : {}".format( ticket_response )) continue else: BUF.append("Ticket XMPP ouvert.") # Launch welcoming mail mailtextheaders = get_file_content_all(WELCOME_MAIL_HEADERS) mailtext = get_file_content_all(WELCOME_MAIL) mailtextheaders = mailtextheaders.replace("COURRIEL_INSCRIPTION", email) mailtext = mailtext.replace("NUMERO_MEMBRE", numero) mailtext = mailtext.replace("COURRIEL_MEMBRE", member['email']) mailtext = mailtext.replace("COURRIEL_INSCRIPTION", email) mailtext = mailtext.replace("NOM_MEMBRE", member['nom']) sendmail(mailtextheaders, mailtext) # Register new member to be validated try: os.remove(registration.replace("attente", "valide")) except: pass os.rename(registration, registration.replace("attente", "valide")) def main(): setup_workdir() new_registrations_worker() validate_registrations_worker() # End of work # Launch summary mail mailheaders = get_file_content_all(SUMMARY_MAIL) + "\n" mailtext = "" is_sendable = False; for line in BUF: mailtext += line + "\n" is_sendable = True; print(line) if is_sendable: sendmail(mailheaders, mailtext) ## Bootstrap if __name__ == '__main__': main()