#!/usr/bin/python3 # Libre en Communs's cotisation control program # Copyright (C) 2022-2024 Libre en Communs # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import os, requests, json, datetime, shutil, quopri, subprocess, base64 from requests.auth import HTTPBasicAuth from requests.auth import HTTPDigestAuth import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from email import encoders from email.utils import formataddr from pathlib import Path VERSION="1.0.0" GESTION_SECRET_FILE="/home/tresorier/.secret/gestion_api_password" GIT_SECRET_FILE="/home/tresorier/.secret/git_api_password" WORKDIR="/srv/validation_cotisation.d" MODALITY_MAIL="mail_instructions.txt" MODALITY_MAIL_HEADERS="mail_instructions_headers.txt" RECEPT_MAIL="mail_recu.txt" RECEPT_MAIL_HEADERS="mail_recu_headers.txt" RECEPT_MAIL_ATTACHMENT="mail_recu_attachment.txt" SUMMARY_MAIL="mail_recap_header.txt" SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location BUF=[] GITEA_URL = "https://forge.a-lec.org" # Update these with your SMTP server details SMTP_SERVER = 'mail.a-lec.org' SMTP_PORT = 587 SMTP_USER = 'tresorier@a-lec.org' SMTP_SECRET_FILE = "/home/tresorier/.secret/smtp_api_password" # gestion_read("SELECT * FROM services_users su \ # INNER JOIN membres m ON m.id = su.id_user \ # INNER JOIN services_fees sf ON sf.id = su.id_fee \ # LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id \ # LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction \ # WHERE m.id = 3 AND l.id_account = 481;") def gestion_get_secret(): with open(GESTION_SECRET_FILE) as sfile: return sfile.readline().replace("\n", "") def smtp_get_secret(): with open(SMTP_SECRET_FILE) as sfile: return sfile.readline().replace("\n", "") def git_get_secret(): with open(GIT_SECRET_FILE) as sfile: return sfile.readline().replace("\n", "") def get_file_content(filename): with open(filename) as sfile: return sfile.readlines() def get_file_content_all(filename): with open(filename) as sfile: return sfile.read() def set_file_content(filename, lines): with open(filename, "x") as sfile: return sfile.writelines(lines) def gestion_read(req): response = requests.post('https://gestion.a-lec.org/api/sql/', auth = HTTPBasicAuth('api666', gestion_get_secret()), data = req) return response.json() def git_mail_ticket_read(req): response: requests.Response = requests.get('https://forge.a-lec.org/api/v1/repos/{}/{}/issues?access_token={}'.format("cominfra", "mail", git_get_secret()), params=req) return response.json() def setup_workdir(): if not os.path.isdir(WORKDIR): os.mkdir(WORKDIR) if not "impayé" in os.listdir(WORKDIR): os.mkdir(WORKDIR+"/impayé") if not "expiré" in os.listdir(WORKDIR): os.mkdir(WORKDIR+"/expiré") if not "validé" in os.listdir(WORKDIR): os.mkdir(WORKDIR+"/validé") if not "transition" in os.listdir(WORKDIR): os.mkdir(WORKDIR+"/transition") # def sendmail(headers, data): # msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8')) # subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg) # def sendmail_with_attachment(headers, data, attachment_header, attachment, ending): # msg = bytes(headers + "\n", 'utf-8') \ # + quopri.encodestring(bytes(data, 'utf-8')) \ # + bytes(attachment_header + "\n", 'utf-8') \ # + base64.b64encode(attachment) \ # + bytes(ending, 'utf-8') # subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg) def sendmail_with_attachment(headers, data, attachment_path=None, filename=None): # Parse headers msg = MIMEMultipart() for header in headers.split("\n"): if ": " in header: key, value = header.split(": ", 1) msg[key] = value.strip() # Add the email body body = MIMEText(data, 'plain', 'utf-8') msg.attach(body) # Add the attachment only if attachment_path is provided if attachment_path and filename: attachment = MIMEBase('application', 'octet-stream') with open(attachment_path, "rb") as attach_file: attachment.set_payload(attach_file.read()) encoders.encode_base64(attachment) attachment.add_header('Content-Disposition', f'attachment; filename={filename}') msg.attach(attachment) # Send the email try: with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: server.starttls() # Start TLS encryption server.login(SMTP_USER, smtp_get_secret()) server.sendmail(SMTP_USER, msg['To'], msg.as_string()) print("Email sent successfully") except Exception as e: print(f"Failed to send email: {e}") def gestion_get_expired(): request_expired = "SELECT id_user FROM services_users su " +\ "INNER JOIN users m ON m.id = su.id_user " +\ "INNER JOIN services s ON s.id = su.id_service " +\ "LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\ "INNER JOIN (SELECT id, MAX(date) " +\ "FROM services_users " +\ "GROUP BY id_user, id_service) " +\ "AS su2 ON su2.id = su.id " +\ "WHERE su.id_service = 1 " +\ "AND su.expiry_date < date() " +\ "AND NOT (m.id_category = 10 " +\ "OR m.id_category = 3 " +\ "OR m.id_category = 2 " +\ "OR m.id_category = 8);" expired_members_list = \ [ str(x["id_user"]) for x in gestion_read(request_expired)["results"]] return expired_members_list.copy() def gestion_get_unpaid(): request_unpaid = "SELECT id_user FROM services_users su " +\ "INNER JOIN users m ON m.id = su.id_user " +\ "INNER JOIN services s ON s.id = su.id_service " +\ "LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\ "INNER JOIN (SELECT id, MAX(date) " +\ "FROM services_users " +\ "GROUP BY id_user, id_service) " +\ "AS su2 ON su2.id = su.id " +\ "WHERE su.id_service = 1 " +\ "AND su.paid = 0 " +\ "AND NOT (m.id_category = 10 " +\ "OR m.id_category = 3 " +\ "OR m.id_category = 2 " +\ "OR m.id_category = 8);" unpaid_members_list = \ [ str(x["id_user"]) for x in gestion_read(request_unpaid)["results"]] return unpaid_members_list.copy() def gestion_get_amount(member): request_unpaid = "SELECT amount FROM services_users su " +\ "INNER JOIN users m ON m.id = su.id_user " +\ "INNER JOIN services s ON s.id = su.id_service " +\ "LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\ "INNER JOIN (SELECT id, MAX(date) " +\ "FROM services_users " +\ "GROUP BY id_user, id_service) " +\ "AS su2 ON su2.id = su.id " +\ "WHERE id_user = {};".format(member) return gestion_read(request_unpaid)["results"][-1]["amount"] def gestion_get_date(member): request_unpaid = "SELECT date FROM services_users su " +\ "INNER JOIN users m ON m.id = su.id_user " +\ "INNER JOIN services s ON s.id = su.id_service " +\ "LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\ "INNER JOIN (SELECT id, MAX(date) " +\ "FROM services_users " +\ "GROUP BY id_user, id_service) " +\ "AS su2 ON su2.id = su.id " +\ "WHERE id_user = {};".format(member) return gestion_read(request_unpaid)["results"][-1]["date"] def get_member_infos(member): request = "SELECT * FROM users " +\ "WHERE id = '{}';".format(member) try: name = gestion_read(request)['results'][-1]['nom'] numero = gestion_read(request)['results'][-1]['numero'] email = gestion_read(request)['results'][-1]['email'] except: print(gestion_read(request)) raise(Exception) return (name, numero, email) def check_email_created(member): # Get member infos name, numero, email = get_member_infos(member) if numero < 15: return True request = { 'state' : "open", 'labels' : [968], 'q' : "Création d'un compte courriel membre n°{}".format(numero) } answer1 = git_mail_ticket_read(request) request = { 'state' : "closed", 'q' : "Création d'un compte courriel membre n°{}".format(numero) } answer2 = git_mail_ticket_read(request) request = { 'state' : "closed", 'q' : "Inscription {}".format(numero) } answer3 = git_mail_ticket_read(request) request = { 'state' : "closed", 'q' : "Adhésion {}".format(numero) } answer4 = git_mail_ticket_read(request) request = { 'state' : "closed", 'q' : "Adhésion n° {}".format(numero) } answer4 = git_mail_ticket_read(request) if len(answer1) + len(answer2) + len(answer3) + len(answer4) >= 1: return True return False def notify_expired(member): BUF.append("* {}".format(member)) BUF.append(" EXPIRATION ADHESION") BUF.append("") def notify_unpaid(member): if not check_email_created(member): return False # Get member infos name, numero, email = get_member_infos(member) amount = "{},{}".format(str(gestion_get_amount(member))[:-2], str(gestion_get_amount(member))[-2:]) year = gestion_get_date(member)[:4] BUF.append("* {} (numero {}), {}, {} €".format(member, numero, name, amount)) BUF.append(" NOTIFICATION MEMBRE") BUF.append("") mailtext = get_file_content_all(MODALITY_MAIL) + "\n" mailtext = mailtext.replace("NOM_COTISANT", name) mailtext = mailtext.replace("NUMERO_MEMBRE", str(numero)) mailtext = mailtext.replace("MONTANT_COTISATION", amount) mailtext = mailtext.replace("ANNEE_CIVILE", year) mailheaders = get_file_content_all(MODALITY_MAIL_HEADERS) + "\n" mailheaders = mailheaders.replace("ANNEE_CIVILE", year) mailheaders = mailheaders.replace("COURRIEL-COTISANT", email) sendmail(mailheaders, mailtext) return True def renotify_unpaid(member): if not check_email_created(member): return False # Get member infos name, numero, email = get_member_infos(member) amount = "{},{}".format(str(gestion_get_amount(member))[:-2], str(gestion_get_amount(member))[-2:]) year = gestion_get_date(member)[:4] BUF.append("* {} (numero {}), {}, {} €".format(member, numero, name, amount)) BUF.append(" RENOTIFICATION MEMBRE") BUF.append("") mailtext = get_file_content_all(MODALITY_MAIL) + "\n" mailtext = mailtext.replace("NOM_COTISANT", name) mailtext = mailtext.replace("NUMERO_MEMBRE", str(numero)) mailtext = mailtext.replace("MONTANT_COTISATION", amount) mailtext = mailtext.replace("ANNEE_CIVILE", year) mailheaders = get_file_content_all(MODALITY_MAIL_HEADERS) + "\n" mailheaders = mailheaders.replace("ANNEE_CIVILE", year) mailheaders = mailheaders.replace("COURRIEL-COTISANT", email) mailheaders = mailheaders.replace("Modalit", "Rappel_:_modalit") sendmail(mailheaders, mailtext) def check_expired_unpaid(): expired_members = gestion_get_expired() unpaid_members = gestion_get_unpaid() # Check expired members if set(expired_members) != set(os.listdir(WORKDIR+"/expiré")): BUF.append("Membres expirés : {}\n".format(gestion_get_expired())) # Check for no-more-expired members for record in os.listdir(WORKDIR+"/expiré"): if not record in expired_members: os.rename(WORKDIR+"/expiré/"+record, WORKDIR+"/transition/"+record) # Check for new expired members for member in expired_members: if not str(member) in os.listdir(WORKDIR+"/expiré"): set_file_content(WORKDIR+"/expiré/"+str(member), "") notify_expired(member) # Check unpaid members if set(unpaid_members) != set(os.listdir(WORKDIR+"/impayé")): BUF.append("Membres en impayé : {}\n".format(gestion_get_unpaid())) # Check for no-more-unpaid members for record in os.listdir(WORKDIR+"/impayé"): if not record in unpaid_members: os.rename(WORKDIR+"/impayé/"+record, WORKDIR+"/transition/"+record) # Check for new unpaid members and set date of first contact for member in unpaid_members: if not member in os.listdir(WORKDIR+"/impayé"): # Get member infos name, numero, email = get_member_infos(member) if notify_unpaid(member): set_file_content(WORKDIR+"/impayé/"+str(member), str(datetime.datetime.now().strftime("%d/%m/%Y"))) else: # Ancient date for the system to be triggered set_file_content(WORKDIR+"/impayé/"+str(member),"27/09/1983") BUF.append("* {} (numero {}), {}".format(member, numero, name)) BUF.append(" COURRIEL NON FONCTIONNEL") # Check periodically for unpaid members for record in os.listdir(WORKDIR+"/impayé"): last_contact = \ datetime.datetime.strptime( get_file_content(WORKDIR+"/impayé/"+record)[0].replace("\n", ""), '%d/%m/%Y') # Too old to be a relaunch if abs(datetime.datetime.now() - last_contact).days > 300: if notify_unpaid(record): os.remove(WORKDIR+"/impayé/"+str(record)) set_file_content(WORKDIR+"/impayé/"+str(record), str(datetime.datetime.now().strftime("%d/%m/%Y"))) # Relaunch elif abs(datetime.datetime.now() - last_contact).days > 30: renotify_unpaid(record) os.remove(WORKDIR+"/impayé/"+str(record)) set_file_content(WORKDIR+"/impayé/"+str(record), str(datetime.datetime.now().strftime("%d/%m/%Y"))) # Clean transited member if unpaid or expired for record in os.listdir(WORKDIR+"/transition"): if record in os.listdir(WORKDIR+"/impayé") or \ record in os.listdir(WORKDIR+"/expiré"): os.remove(WORKDIR+"/transition/"+record) def validate(member): if not check_email_created(member): return False # Get infos request = "SELECT *, l.reference true_reference " +\ "FROM services_users su " +\ "INNER JOIN users m ON m.id = su.id_user " +\ "INNER JOIN services_fees sf ON sf.id = su.id_fee " +\ "LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id " +\ "LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction " +\ "INNER JOIN acc_transactions tr ON tr.id = l.id_transaction " +\ "WHERE m.id = {} AND l.id_account = 481;".format(member) # Note: su.id_service = 1 parceque la cotisation correspond au service 1 try: answer = gestion_read(request)["results"][-1] except IndexError: BUF.append("* {} (numero inconnu)".format(member)) BUF.append(" SUPPRESSION MEMBRE") BUF.append("") os.remove(WORKDIR+"/transition/"+member) return date = datetime.datetime.strptime( answer["date"],'%Y-%m-%d').strftime("%d/%m/%Y") filename = "{}_reçu_{}".format( answer["nom"].replace(" ", "_").replace("'", "_"), date.replace("/", ".")) BUF.append("* {} (numero {}), {}".format(answer["id"], answer["numero"], answer["nom"])) BUF.append(" COTISATION : {},{}€".format(str(answer["amount"])[:-2], str(answer["amount"])[-2:])) BUF.append(" ANNEE CIVILE : {}".format(answer["date"][:4])) BUF.append(" VALIDATION MEMBRE") BUF.append("") # Generate receipt latexfile = get_file_content_all("RECU_COTISATION.tex") latexfile = latexfile.replace("ANNEE-CIVILE", answer["date"][:4]) latexfile = latexfile.replace("NOM-COTISANT", answer["nom"]) latexfile = latexfile.replace("STATUT-COTISANT", answer["statut_juridique"]) latexfile = latexfile.replace("ADRESSE-COTISANT", "{}, {} {}".format( answer["adresse"], answer["code_postal"], answer["ville"])) latexfile = latexfile.replace("SOMME", "{},{}".format( str(answer["amount"])[:-2], str(answer["amount"])[-2:])) latexfile = latexfile.replace("DATE-VERSEMENT", date) if not answer["true_reference"]: BUF.append("Erreur : référence introuvable (déféré)") return latexfile = latexfile.replace("MODE-VERSEMENT", answer["true_reference"]) try: os.remove(WORKDIR+"/validé/"+filename+".tex") except: pass set_file_content(WORKDIR+"/validé/"+filename+".tex", latexfile) os.system("cd {} && pdflatex {}".format(WORKDIR+"/validé/", filename+".tex")) # Preparing the email mailheaders = get_file_content_all(RECEPT_MAIL_HEADERS).replace("COURRIEL-COTISANT", answer["email"]) + "\n" mailtext = get_file_content_all(RECEPT_MAIL).replace("ANNEE-CIVILE", answer["date"][:4]) + "\n" # Sending the email with the attached PDF pdf_path = WORKDIR+"/validé/"+filename+".pdf" sendmail_with_attachment(mailheaders, mailtext, pdf_path, filename + ".pdf") # Cleanup os.remove(WORKDIR+"/transition/"+member) def validate_members(): for record in os.listdir(WORKDIR+"/transition"): validate(record) def main(): setup_workdir() check_expired_unpaid() validate_members() # End of work # Launch summary mail mailheaders = get_file_content_all(SUMMARY_MAIL) + "\n" mailtext = "" is_sendable = False for line in BUF: mailtext += line + "\n" is_sendable = True print(line) if is_sendable: # Assuming no attachment is needed for the summary mail sendmail_with_attachment(mailheaders, mailtext, None, None) ## Bootstrap if __name__ == '__main__': main()