#!/usr/bin/python3 # Libre en Communs's cotisation control program # Copyright (C) 2022 Libre en Communs # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import os, requests, json, datetime, shutil, quopri, subprocess, base64 from requests.auth import HTTPBasicAuth from requests.auth import HTTPDigestAuth VERSION="1.0.0" GESTION_SECRET_FILE="/home/tresorier/.secret/gestion_api_password" GIT_SECRET_FILE="/home/tresorier/.secret/git_api_password" WORKDIR="/srv/validation_cotisation.d" MODALITY_MAIL="mail_instructions.txt" MODALITY_MAIL_HEADERS="mail_instructions_headers.txt" RECEPT_MAIL="mail_recu.txt" RECEPT_MAIL_HEADERS="mail_recu_headers.txt" RECEPT_MAIL_ATTACHMENT="mail_recu_attachment.txt" SUMMARY_MAIL="mail_recap_header.txt" SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location BUF=[] # gestion_read("SELECT * FROM services_users su \ # INNER JOIN membres m ON m.id = su.id_user \ # INNER JOIN services_fees sf ON sf.id = su.id_fee \ # LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id \ # LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction \ # WHERE m.id = 3 AND l.id_account = 481;") def gestion_get_secret(): with open(GESTION_SECRET_FILE) as sfile: return sfile.readline().replace("\n", "") def git_get_secret(): with open(GIT_SECRET_FILE) as sfile: return sfile.readline().replace("\n", "") def get_file_content(filename): with open(filename) as sfile: return sfile.readlines() def get_file_content_all(filename): with open(filename) as sfile: return sfile.read() def set_file_content(filename, lines): with open(filename, "x") as sfile: return sfile.writelines(lines) def gestion_read(req): response = requests.post('https://gestion.a-lec.org/api/sql/', auth = HTTPBasicAuth('api666', gestion_get_secret()), data = req) return response.json() def git_mail_ticket_read(req): response = requests.get('https://git.a-lec.org/api/v4/projects/46/issues', headers={'PRIVATE-TOKEN' : git_get_secret()}, params = req) return response.json() def git_xmpp_ticket_create(req): response = requests.post('https://git.a-lec.org/api/v4/projects/44/issues', headers={'PRIVATE-TOKEN' : git_get_secret()}, data = req) return response.json() def gestion_adduser(req): response = requests.put('https://gestion.a-lec.org/api/user/import', auth = HTTPBasicAuth('api666', gestion_get_secret()), data = req) return response.content def setup_workdir(): if not os.path.isdir(WORKDIR): os.mkdir(WORKDIR) if not "impayé" in os.listdir(WORKDIR): os.mkdir(WORKDIR+"/impayé") if not "expiré" in os.listdir(WORKDIR): os.mkdir(WORKDIR+"/expiré") if not "validé" in os.listdir(WORKDIR): os.mkdir(WORKDIR+"/validé") if not "transition" in os.listdir(WORKDIR): os.mkdir(WORKDIR+"/transition") def sendmail(headers, data): msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8')) subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg) def sendmail_with_attachment(headers, data, attachment_header, attachment, ending): msg = bytes(headers + "\n", 'utf-8') \ + quopri.encodestring(bytes(data, 'utf-8')) \ + bytes(attachment_header + "\n", 'utf-8') \ + base64.b64encode(attachment) \ + bytes(ending, 'utf-8') subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg) def gestion_get_expired(): request_expired = "SELECT id_user FROM services_users su " +\ "INNER JOIN membres m ON m.id = su.id_user " +\ "INNER JOIN services s ON s.id = su.id_service " +\ "LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\ "INNER JOIN (SELECT id, MAX(date) " +\ "FROM services_users " +\ "GROUP BY id_user, id_service) " +\ "AS su2 ON su2.id = su.id " +\ "WHERE su.id_service = 1 " +\ "AND su.expiry_date < date() " +\ "AND NOT (m.id_category = 10 " +\ "OR m.id_category = 3 " +\ "OR m.id_category = 2 " +\ "OR m.id_category = 8);" expired_members_list = \ [ str(x["id_user"]) for x in gestion_read(request_expired)["results"]] return expired_members_list.copy() def gestion_get_unpaid(): request_unpaid = "SELECT id_user FROM services_users su " +\ "INNER JOIN membres m ON m.id = su.id_user " +\ "INNER JOIN services s ON s.id = su.id_service " +\ "LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\ "INNER JOIN (SELECT id, MAX(date) " +\ "FROM services_users " +\ "GROUP BY id_user, id_service) " +\ "AS su2 ON su2.id = su.id " +\ "WHERE su.id_service = 1 " +\ "AND su.paid = 0 " +\ "AND NOT (m.id_category = 10 " +\ "OR m.id_category = 3 " +\ "OR m.id_category = 2 " +\ "OR m.id_category = 8);" unpaid_members_list = \ [ str(x["id_user"]) for x in gestion_read(request_unpaid)["results"]] return unpaid_members_list.copy() def gestion_get_amount(member): request_unpaid = "SELECT amount FROM services_users su " +\ "INNER JOIN membres m ON m.id = su.id_user " +\ "INNER JOIN services s ON s.id = su.id_service " +\ "LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\ "INNER JOIN (SELECT id, MAX(date) " +\ "FROM services_users " +\ "GROUP BY id_user, id_service) " +\ "AS su2 ON su2.id = su.id " +\ "WHERE id_user = {};".format(member) return gestion_read(request_unpaid)["results"][-1]["amount"] def gestion_get_date(member): request_unpaid = "SELECT date FROM services_users su " +\ "INNER JOIN membres m ON m.id = su.id_user " +\ "INNER JOIN services s ON s.id = su.id_service " +\ "LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\ "INNER JOIN (SELECT id, MAX(date) " +\ "FROM services_users " +\ "GROUP BY id_user, id_service) " +\ "AS su2 ON su2.id = su.id " +\ "WHERE id_user = {};".format(member) return gestion_read(request_unpaid)["results"][-1]["date"] def get_member_infos(member): request = "SELECT * FROM membres " +\ "WHERE id = '{}';".format(member) try: name = gestion_read(request)['results'][-1]['nom'] numero = gestion_read(request)['results'][-1]['numero'] email = gestion_read(request)['results'][-1]['email'] except: print(gestion_read(request)) raise(Exception) return (name, numero, email) def check_email_created(member): # Get member infos name, numero, email = get_member_infos(member) request = { 'not[labels]' : "Résolu", 'state' : "opened" } answer = git_mail_ticket_read(request) for ticket in answer: if "n°{}.".format(numero) in ticket['title']: return False return True def notify_expired(member): BUF.append("* {}".format(member)) BUF.append(" EXPIRATION ADHESION") BUF.append("") def notify_unpaid(member): if not check_email_created(member): return False # Get member infos name, numero, email = get_member_infos(member) amount = "{},{}".format(str(gestion_get_amount(member))[:-2], str(gestion_get_amount(member))[-2:]) year = gestion_get_date(member)[:4] BUF.append("* {} (numero {}), {}, {} €".format(member, numero, name, amount)) BUF.append(" NOTIFICATION MEMBRE") BUF.append("") mailtext = get_file_content_all(MODALITY_MAIL) + "\n" mailtext = mailtext.replace("NOM_COTISANT", name) mailtext = mailtext.replace("NUMERO_MEMBRE", str(numero)) mailtext = mailtext.replace("MONTANT_COTISATION", amount) mailheaders = get_file_content_all(MODALITY_MAIL_HEADERS) + "\n" mailheaders = mailheaders.replace("ANNEE_CIVILE", year) mailheaders = mailheaders.replace("COURRIEL-COTISANT", email) sendmail(mailheaders, mailtext) return True def renotify_unpaid(member): # Get member infos name, numero, email = get_member_infos(member) amount = "{},{}".format(str(gestion_get_amount(member))[:-2], str(gestion_get_amount(member))[-2:]) year = gestion_get_date(member)[:4] BUF.append("* {} (numero {}), {}, {} €".format(member, numero, name, amount)) BUF.append(" NOTIFICATION MEMBRE") BUF.append("") mailtext = get_file_content_all(MODALITY_MAIL) + "\n" mailtext = mailtext.replace("NOM_COTISANT", name) mailtext = mailtext.replace("NUMERO_MEMBRE", str(numero)) mailtext = mailtext.replace("MONTANT_COTISATION", amount) mailheaders = get_file_content_all(MODALITY_MAIL_HEADERS) + "\n" mailheaders = mailheaders.replace("ANNEE_CIVILE", year) mailheaders = mailheaders.replace("COURRIEL-COTISANT", email) sendmail(mailheaders, mailtext) def check_expired_unpaid(): expired_members = gestion_get_expired() unpaid_members = gestion_get_unpaid() # Check expired members if set(expired_members) != set(os.listdir(WORKDIR+"/expiré")): BUF.append("Membres expirés : {}\n".format(gestion_get_expired())) # Check for no-more-expired members for record in os.listdir(WORKDIR+"/expiré"): if not record in expired_members: os.rename(WORKDIR+"/expiré/"+record, WORKDIR+"/transition/"+record) # Check for new expired members for member in expired_members: if not str(member) in os.listdir(WORKDIR+"/expiré"): set_file_content(WORKDIR+"/expiré/"+str(member), "") notify_expired(member) # Check unpaid members if set(unpaid_members) != set(os.listdir(WORKDIR+"/impayé")): BUF.append("Membres en impayé : {}\n".format(gestion_get_unpaid())) # Check for no-more-unpaid members for record in os.listdir(WORKDIR+"/impayé"): if not record in unpaid_members: os.rename(WORKDIR+"/impayé/"+record, WORKDIR+"/transition/"+record) # Check for new unpaid members and set date of first contact for member in unpaid_members: if not member in os.listdir(WORKDIR+"/impayé"): # Get member infos name, numero, email = get_member_infos(member) if notify_unpaid(member): set_file_content(WORKDIR+"/impayé/"+str(member), str(datetime.datetime.now().strftime("%d/%m/%Y"))) else: # Ancient date for the system to be triggered set_file_content(WORKDIR+"/impayé/"+str(member),"27/09/1983") BUF.append("* {} (numero {}), {}".format(member, numero, name)) BUF.append(" COURRIEL NON FONCTIONNEL") # Check periodically for unpaid members for record in os.listdir(WORKDIR+"/impayé"): last_contact = \ datetime.datetime.strptime( get_file_content(WORKDIR+"/impayé/"+record)[0],'%d/%m/%Y') # Too old to be a relaunch if abs(datetime.datetime.now() - last_contact).days > 300: if notify_unpaid(record): os.remove(WORKDIR+"/impayé/"+str(record)) set_file_content(WORKDIR+"/impayé/"+str(record), str(datetime.datetime.now().strftime("%d/%m/%Y"))) # Relaunch elif abs(datetime.datetime.now() - last_contact).days > 30: renotify_unpaid(record) os.remove(WORKDIR+"/impayé/"+str(record)) set_file_content(WORKDIR+"/impayé/"+str(record), str(datetime.datetime.now().strftime("%d/%m/%Y"))) # Clean transited member if unpaid or expired for record in os.listdir(WORKDIR+"/transition"): if record in os.listdir(WORKDIR+"/impayé") or \ record in os.listdir(WORKDIR+"/expiré"): os.remove(WORKDIR+"/transition/"+record) def validate(member): # Get infos request = "SELECT *, l.reference true_reference " +\ "FROM services_users su " +\ "INNER JOIN membres m ON m.id = su.id_user " +\ "INNER JOIN services_fees sf ON sf.id = su.id_fee " +\ "LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id " +\ "LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction " +\ "INNER JOIN acc_transactions tr ON tr.id = l.id_transaction " +\ "WHERE m.id = {} AND l.id_account = 481;".format(member) # Note: su.id_service = 1 parceque la cotisation correspond au service 1 try: answer = gestion_read(request)["results"][-1] except: print(gestion_read(request)) raise(Exception) date = datetime.datetime.strptime( answer["date"],'%Y-%m-%d').strftime("%d/%m/%Y") filename = "{}_reçu_{}".format( answer["nom"].replace(" ", "_"), date.replace("/", ".")) BUF.append("* {} (numero {}), {}".format(answer["id"], answer["numero"], answer["nom"])) BUF.append(" COTISATION : {},{}€".format(str(answer["amount"])[:-2], str(answer["amount"])[-2:])) BUF.append(" ANNEE CIVILE : {}".format(answer["date"][:4])) BUF.append(" VALIDATION MEMBRE") BUF.append("") # Generate receipt latexfile = get_file_content_all("RECU_COTISATION.tex") latexfile = latexfile.replace("ANNEE-CIVILE", answer["date"][:4]) latexfile = latexfile.replace("NOM-COTISANT", answer["nom"]) latexfile = latexfile.replace("STATUT-COTISANT", answer["statut_juridique"]) latexfile = latexfile.replace("ADRESSE-COTISANT", "{}, {} {}".format( answer["adresse"], answer["code_postal"], answer["ville"])) latexfile = latexfile.replace("SOMME", "{},{}".format( str(answer["amount"])[:-2], str(answer["amount"])[-2:])) latexfile = latexfile.replace("DATE-VERSEMENT", date) latexfile = latexfile.replace("MODE-VERSEMENT", answer["true_reference"]) try: os.remove(WORKDIR+"/validé/"+filename+".tex") except: pass set_file_content(WORKDIR+"/validé/"+filename+".tex", latexfile) os.system("cd {} && pdflatex {}".format(WORKDIR+"/validé/", filename+".tex")) # Preparing mail mailheaders = get_file_content_all(RECEPT_MAIL_HEADERS).replace("COURRIEL-COTISANT", answer["email"]) + "\n" mailtext = get_file_content_all(RECEPT_MAIL).replace("ANNEE-CIVILE", answer["date"][:4]) + "\n" mailtattach = get_file_content_all(RECEPT_MAIL_ATTACHMENT) + "\n" # Opening PDF file as binary attachment = open(WORKDIR+"/validé/"+filename+".pdf", "rb") data = attachment.read() attachment.close() ending = "--------------3yxkFgv0AINs5nd0i6BJrWaV--" sendmail_with_attachment(mailheaders, mailtext, mailtattach, data, ending) # The end os.remove(WORKDIR+"/transition/"+member) def validate_members(): for record in os.listdir(WORKDIR+"/transition"): validate(record) def main(): setup_workdir() check_expired_unpaid() validate_members() # End of work # Launch summary mail mailheaders = get_file_content_all(SUMMARY_MAIL) + "\n" mailtext = "" is_sendable = False; for line in BUF: mailtext += line + "\n" is_sendable = True; print(line) if is_sendable: sendmail(mailheaders, mailtext) ## Bootstrap if __name__ == '__main__': main()