gestion/controle_cotisation/main.py

506 lines
19 KiB
Python
Executable File

#!/usr/bin/python3
# Libre en Communs's cotisation control program
# Copyright (C) 2022-2024 Libre en Communs
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os, requests, json, datetime, shutil, quopri, subprocess, base64
from requests.auth import HTTPBasicAuth
from requests.auth import HTTPDigestAuth
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from email.utils import formataddr
from pathlib import Path
VERSION="1.0.0"
GESTION_SECRET_FILE="/home/tresorier/.secret/gestion_api_password"
GIT_SECRET_FILE="/home/tresorier/.secret/git_api_password"
WORKDIR="/srv/validation_cotisation.d"
MODALITY_MAIL="mail_instructions.txt"
MODALITY_MAIL_HEADERS="mail_instructions_headers.txt"
RECEPT_MAIL="mail_recu.txt"
RECEPT_MAIL_HEADERS="mail_recu_headers.txt"
RECEPT_MAIL_ATTACHMENT="mail_recu_attachment.txt"
SUMMARY_MAIL="mail_recap_header.txt"
SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location
BUF=[]
GITEA_URL = "https://forge.a-lec.org"
# Update these with your SMTP server details
SMTP_SERVER = 'mail.a-lec.org'
SMTP_PORT = 587
SMTP_USER = 'tresorier@a-lec.org'
SMTP_SECRET_FILE = "/home/tresorier/.secret/smtp_api_password"
# gestion_read("SELECT * FROM services_users su \
# INNER JOIN membres m ON m.id = su.id_user \
# INNER JOIN services_fees sf ON sf.id = su.id_fee \
# LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id \
# LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction \
# WHERE m.id = 3 AND l.id_account = 481;")
def gestion_get_secret():
with open(GESTION_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def smtp_get_secret():
with open(SMTP_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def git_get_secret():
with open(GIT_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def get_file_content(filename):
with open(filename) as sfile:
return sfile.readlines()
def get_file_content_all(filename):
with open(filename) as sfile:
return sfile.read()
def set_file_content(filename, lines):
with open(filename, "x") as sfile:
return sfile.writelines(lines)
def gestion_read(req):
response = requests.post('https://gestion.a-lec.org/api/sql/',
auth = HTTPBasicAuth('api666', gestion_get_secret()),
data = req)
return response.json()
def git_mail_ticket_read(req):
response: requests.Response = requests.get('https://forge.a-lec.org/api/v1/repos/{}/{}/issues?access_token={}'.format("cominfra", "mail", git_get_secret()), params=req)
return response.json()
def setup_workdir():
if not os.path.isdir(WORKDIR):
os.mkdir(WORKDIR)
if not "impayé" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/impayé")
if not "expiré" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/expiré")
if not "validé" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/validé")
if not "transition" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/transition")
# def sendmail(headers, data):
# msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
# subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
# def sendmail_with_attachment(headers, data, attachment_header, attachment, ending):
# msg = bytes(headers + "\n", 'utf-8') \
# + quopri.encodestring(bytes(data, 'utf-8')) \
# + bytes(attachment_header + "\n", 'utf-8') \
# + base64.b64encode(attachment) \
# + bytes(ending, 'utf-8')
# subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def sendmail_with_attachment(headers, data, attachment_path=None, filename=None):
# Parse headers
msg = MIMEMultipart()
for header in headers.split("\n"):
if ": " in header:
key, value = header.split(": ", 1)
msg[key] = value.strip()
# Add the email body
body = MIMEText(data, 'plain', 'utf-8')
msg.attach(body)
# Add the attachment only if attachment_path is provided
if attachment_path and filename:
attachment = MIMEBase('application', 'octet-stream')
with open(attachment_path, "rb") as attach_file:
attachment.set_payload(attach_file.read())
encoders.encode_base64(attachment)
attachment.add_header('Content-Disposition', f'attachment; filename={filename}')
msg.attach(attachment)
# Send the email
try:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls() # Start TLS encryption
server.login(SMTP_USER, smtp_get_secret())
server.sendmail(SMTP_USER, msg['To'], msg.as_string())
print("Email sent successfully")
except Exception as e:
print(f"Failed to send email: {e}")
def gestion_get_expired():
request_expired = "SELECT id_user FROM services_users su " +\
"INNER JOIN users m ON m.id = su.id_user " +\
"INNER JOIN services s ON s.id = su.id_service " +\
"LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\
"INNER JOIN (SELECT id, MAX(date) " +\
"FROM services_users " +\
"GROUP BY id_user, id_service) " +\
"AS su2 ON su2.id = su.id " +\
"WHERE su.id_service = 1 " +\
"AND su.expiry_date < date() " +\
"AND NOT (m.id_category = 10 " +\
"OR m.id_category = 3 " +\
"OR m.id_category = 2 " +\
"OR m.id_category = 8);"
expired_members_list = \
[ str(x["id_user"]) for x in gestion_read(request_expired)["results"]]
return expired_members_list.copy()
def gestion_get_unpaid():
request_unpaid = "SELECT id_user FROM services_users su " +\
"INNER JOIN users m ON m.id = su.id_user " +\
"INNER JOIN services s ON s.id = su.id_service " +\
"LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\
"INNER JOIN (SELECT id, MAX(date) " +\
"FROM services_users " +\
"GROUP BY id_user, id_service) " +\
"AS su2 ON su2.id = su.id " +\
"WHERE su.id_service = 1 " +\
"AND su.paid = 0 " +\
"AND NOT (m.id_category = 10 " +\
"OR m.id_category = 3 " +\
"OR m.id_category = 2 " +\
"OR m.id_category = 8);"
unpaid_members_list = \
[ str(x["id_user"]) for x in gestion_read(request_unpaid)["results"]]
return unpaid_members_list.copy()
def gestion_get_amount(member):
request_unpaid = "SELECT amount FROM services_users su " +\
"INNER JOIN users m ON m.id = su.id_user " +\
"INNER JOIN services s ON s.id = su.id_service " +\
"LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\
"INNER JOIN (SELECT id, MAX(date) " +\
"FROM services_users " +\
"GROUP BY id_user, id_service) " +\
"AS su2 ON su2.id = su.id " +\
"WHERE id_user = {};".format(member)
return gestion_read(request_unpaid)["results"][-1]["amount"]
def gestion_get_date(member):
request_unpaid = "SELECT date FROM services_users su " +\
"INNER JOIN users m ON m.id = su.id_user " +\
"INNER JOIN services s ON s.id = su.id_service " +\
"LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\
"INNER JOIN (SELECT id, MAX(date) " +\
"FROM services_users " +\
"GROUP BY id_user, id_service) " +\
"AS su2 ON su2.id = su.id " +\
"WHERE id_user = {};".format(member)
return gestion_read(request_unpaid)["results"][-1]["date"]
def get_member_infos(member):
request = "SELECT * FROM users " +\
"WHERE id = '{}';".format(member)
try:
name = gestion_read(request)['results'][-1]['nom']
numero = gestion_read(request)['results'][-1]['numero']
email = gestion_read(request)['results'][-1]['email']
except:
print(gestion_read(request))
raise(Exception)
return (name, numero, email)
def check_email_created(member):
# Get member infos
name, numero, email = get_member_infos(member)
if numero < 15:
return True
request = {
'state' : "open",
'labels' : [968],
'q' : "Création d'un compte courriel membre n°{}".format(numero)
}
answer1 = git_mail_ticket_read(request)
request = {
'state' : "closed",
'q' : "Création d'un compte courriel membre n°{}".format(numero)
}
answer2 = git_mail_ticket_read(request)
request = {
'state' : "closed",
'q' : "Inscription {}".format(numero)
}
answer3 = git_mail_ticket_read(request)
request = {
'state' : "closed",
'q' : "Adhésion {}".format(numero)
}
answer4 = git_mail_ticket_read(request)
request = {
'state' : "closed",
'q' : "Adhésion n° {}".format(numero)
}
answer4 = git_mail_ticket_read(request)
if len(answer1) + len(answer2) + len(answer3) + len(answer4) >= 1:
return True
return False
def notify_expired(member):
BUF.append("* {}".format(member))
BUF.append(" EXPIRATION ADHESION")
BUF.append("")
def notify_unpaid(member):
if not check_email_created(member):
return False
# Get member infos
name, numero, email = get_member_infos(member)
amount = "{},{}".format(str(gestion_get_amount(member))[:-2],
str(gestion_get_amount(member))[-2:])
year = gestion_get_date(member)[:4]
BUF.append("* {} (numero {}), {}, {}".format(member, numero, name, amount))
BUF.append(" NOTIFICATION MEMBRE")
BUF.append("")
mailtext = get_file_content_all(MODALITY_MAIL) + "\n"
mailtext = mailtext.replace("NOM_COTISANT", name)
mailtext = mailtext.replace("NUMERO_MEMBRE", str(numero))
mailtext = mailtext.replace("MONTANT_COTISATION", amount)
mailtext = mailtext.replace("ANNEE_CIVILE", year)
mailheaders = get_file_content_all(MODALITY_MAIL_HEADERS) + "\n"
mailheaders = mailheaders.replace("ANNEE_CIVILE", year)
mailheaders = mailheaders.replace("COURRIEL-COTISANT", email)
sendmail_with_attachment(mailheaders, mailtext, None, None)
return True
def renotify_unpaid(member):
if not check_email_created(member):
return False
# Get member infos
name, numero, email = get_member_infos(member)
amount = "{},{}".format(str(gestion_get_amount(member))[:-2],
str(gestion_get_amount(member))[-2:])
year = gestion_get_date(member)[:4]
BUF.append("* {} (numero {}), {}, {}".format(member, numero, name, amount))
BUF.append(" RENOTIFICATION MEMBRE")
BUF.append("")
mailtext = get_file_content_all(MODALITY_MAIL) + "\n"
mailtext = mailtext.replace("NOM_COTISANT", name)
mailtext = mailtext.replace("NUMERO_MEMBRE", str(numero))
mailtext = mailtext.replace("MONTANT_COTISATION", amount)
mailtext = mailtext.replace("ANNEE_CIVILE", year)
mailheaders = get_file_content_all(MODALITY_MAIL_HEADERS) + "\n"
mailheaders = mailheaders.replace("ANNEE_CIVILE", year)
mailheaders = mailheaders.replace("COURRIEL-COTISANT", email)
mailheaders = mailheaders.replace("Modalit", "Rappel_:_modalit")
sendmail_with_attachment(mailheaders, mailtext, None, None)
def check_expired_unpaid():
expired_members = gestion_get_expired()
unpaid_members = gestion_get_unpaid()
# Check expired members
if set(expired_members) != set(os.listdir(WORKDIR+"/expiré")):
BUF.append("Membres expirés : {}\n".format(gestion_get_expired()))
# Check for no-more-expired members
for record in os.listdir(WORKDIR+"/expiré"):
if not record in expired_members:
os.rename(WORKDIR+"/expiré/"+record,
WORKDIR+"/transition/"+record)
# Check for new expired members
for member in expired_members:
if not str(member) in os.listdir(WORKDIR+"/expiré"):
set_file_content(WORKDIR+"/expiré/"+str(member), "")
notify_expired(member)
# Check unpaid members
if set(unpaid_members) != set(os.listdir(WORKDIR+"/impayé")):
BUF.append("Membres en impayé : {}\n".format(gestion_get_unpaid()))
# Check for no-more-unpaid members
for record in os.listdir(WORKDIR+"/impayé"):
if not record in unpaid_members:
os.rename(WORKDIR+"/impayé/"+record,
WORKDIR+"/transition/"+record)
# Check for new unpaid members and set date of first contact
for member in unpaid_members:
if not member in os.listdir(WORKDIR+"/impayé"):
# Get member infos
name, numero, email = get_member_infos(member)
if notify_unpaid(member):
set_file_content(WORKDIR+"/impayé/"+str(member),
str(datetime.datetime.now().strftime("%d/%m/%Y")))
else:
# Ancient date for the system to be triggered
set_file_content(WORKDIR+"/impayé/"+str(member),"27/09/1983")
BUF.append("* {} (numero {}), {}".format(member, numero, name))
BUF.append(" COURRIEL NON FONCTIONNEL")
# Check periodically for unpaid members
for record in os.listdir(WORKDIR+"/impayé"):
last_contact = \
datetime.datetime.strptime(
get_file_content(WORKDIR+"/impayé/"+record)[0].replace("\n", ""),
'%d/%m/%Y')
# Too old to be a relaunch
if abs(datetime.datetime.now() - last_contact).days > 300:
if notify_unpaid(record):
os.remove(WORKDIR+"/impayé/"+str(record))
set_file_content(WORKDIR+"/impayé/"+str(record),
str(datetime.datetime.now().strftime("%d/%m/%Y")))
# Relaunch
elif abs(datetime.datetime.now() - last_contact).days > 30:
renotify_unpaid(record)
os.remove(WORKDIR+"/impayé/"+str(record))
set_file_content(WORKDIR+"/impayé/"+str(record),
str(datetime.datetime.now().strftime("%d/%m/%Y")))
# Clean transited member if unpaid or expired
for record in os.listdir(WORKDIR+"/transition"):
if record in os.listdir(WORKDIR+"/impayé") or \
record in os.listdir(WORKDIR+"/expiré"):
os.remove(WORKDIR+"/transition/"+record)
def validate(member):
if not check_email_created(member):
return False
# Get infos
request = "SELECT *, l.reference true_reference " +\
"FROM services_users su " +\
"INNER JOIN users m ON m.id = su.id_user " +\
"INNER JOIN services_fees sf ON sf.id = su.id_fee " +\
"LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id " +\
"LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction " +\
"INNER JOIN acc_transactions tr ON tr.id = l.id_transaction " +\
"WHERE m.id = {} AND l.id_account = 481;".format(member)
# Note: su.id_service = 1 parceque la cotisation correspond au service 1
try:
answer = gestion_read(request)["results"][-1]
except IndexError:
BUF.append("* {} (numero inconnu)".format(member))
BUF.append(" SUPPRESSION MEMBRE")
BUF.append("")
os.remove(WORKDIR+"/transition/"+member)
return
date = datetime.datetime.strptime(
answer["date"],'%Y-%m-%d').strftime("%d/%m/%Y")
filename = "{}_reçu_{}".format(
answer["nom"].replace(" ", "_").replace("'", "_"),
date.replace("/", "."))
BUF.append("* {} (numero {}), {}".format(answer["id"],
answer["numero"],
answer["nom"]))
BUF.append(" COTISATION : {},{}".format(str(answer["amount"])[:-2],
str(answer["amount"])[-2:]))
BUF.append(" ANNEE CIVILE : {}".format(answer["date"][:4]))
BUF.append(" VALIDATION MEMBRE")
BUF.append("")
# Generate receipt
latexfile = get_file_content_all("RECU_COTISATION.tex")
latexfile = latexfile.replace("ANNEE-CIVILE", answer["date"][:4])
latexfile = latexfile.replace("NOM-COTISANT", answer["nom"])
latexfile = latexfile.replace("STATUT-COTISANT", answer["statut_juridique"])
latexfile = latexfile.replace("ADRESSE-COTISANT", "{}, {} {}".format(
answer["adresse"],
answer["code_postal"],
answer["ville"]))
latexfile = latexfile.replace("SOMME", "{},{}".format(
str(answer["amount"])[:-2],
str(answer["amount"])[-2:]))
latexfile = latexfile.replace("DATE-VERSEMENT", date)
if not answer["true_reference"]:
BUF.append("Erreur : référence introuvable (déféré)")
return
latexfile = latexfile.replace("MODE-VERSEMENT", answer["true_reference"])
try:
os.remove(WORKDIR+"/validé/"+filename+".tex")
except:
pass
set_file_content(WORKDIR+"/validé/"+filename+".tex", latexfile)
os.system("cd {} && pdflatex {}".format(WORKDIR+"/validé/", filename+".tex"))
# Preparing the email
mailheaders = get_file_content_all(RECEPT_MAIL_HEADERS).replace("COURRIEL-COTISANT", answer["email"]) + "\n"
mailtext = get_file_content_all(RECEPT_MAIL).replace("ANNEE-CIVILE", answer["date"][:4]) + "\n"
# Sending the email with the attached PDF
pdf_path = WORKDIR+"/validé/"+filename+".pdf"
sendmail_with_attachment(mailheaders, mailtext, pdf_path, filename + ".pdf")
# Cleanup
os.remove(WORKDIR+"/transition/"+member)
def validate_members():
for record in os.listdir(WORKDIR+"/transition"):
validate(record)
def main():
setup_workdir()
check_expired_unpaid()
validate_members()
# End of work
# Launch summary mail
mailheaders = get_file_content_all(SUMMARY_MAIL) + "\n"
mailtext = ""
is_sendable = False
for line in BUF:
mailtext += line + "\n"
is_sendable = True
print(line)
if is_sendable:
# Assuming no attachment is needed for the summary mail
sendmail_with_attachment(mailheaders, mailtext, None, None)
## Bootstrap
if __name__ == '__main__':
main()