gestion/controle_cotisation/main.py

438 lines
17 KiB
Python
Executable file

#!/usr/bin/python3
# Libre en Communs's cotisation control program
# Copyright (C) 2022 Libre en Communs
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os, requests, json, datetime, shutil, quopri, subprocess, base64
from requests.auth import HTTPBasicAuth
from requests.auth import HTTPDigestAuth
VERSION="1.0.0"
GESTION_SECRET_FILE="/home/tresorier/.secret/gestion_api_password"
GIT_SECRET_FILE="/home/tresorier/.secret/git_api_password"
WORKDIR="/srv/validation_cotisation.d"
MODALITY_MAIL="mail_instructions.txt"
MODALITY_MAIL_HEADERS="mail_instructions_headers.txt"
RECEPT_MAIL="mail_recu.txt"
RECEPT_MAIL_HEADERS="mail_recu_headers.txt"
RECEPT_MAIL_ATTACHMENT="mail_recu_attachment.txt"
SUMMARY_MAIL="mail_recap_header.txt"
SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location
BUF=[]
# gestion_read("SELECT * FROM services_users su \
# INNER JOIN membres m ON m.id = su.id_user \
# INNER JOIN services_fees sf ON sf.id = su.id_fee \
# LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id \
# LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction \
# WHERE m.id = 3 AND l.id_account = 481;")
def gestion_get_secret():
with open(GESTION_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def git_get_secret():
with open(GIT_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def get_file_content(filename):
with open(filename) as sfile:
return sfile.readlines()
def get_file_content_all(filename):
with open(filename) as sfile:
return sfile.read()
def set_file_content(filename, lines):
with open(filename, "x") as sfile:
return sfile.writelines(lines)
def gestion_read(req):
response = requests.post('https://gestion.a-lec.org/api/sql/',
auth = HTTPBasicAuth('api666', gestion_get_secret()),
data = req)
return response.json()
def git_mail_ticket_read(req):
response = requests.get('https://git.a-lec.org/api/v4/projects/46/issues',
headers={'PRIVATE-TOKEN' : git_get_secret()},
params = req)
return response.json()
def git_xmpp_ticket_create(req):
response = requests.post('https://git.a-lec.org/api/v4/projects/44/issues',
headers={'PRIVATE-TOKEN' : git_get_secret()},
data = req)
return response.json()
def gestion_adduser(req):
response = requests.put('https://gestion.a-lec.org/api/user/import',
auth = HTTPBasicAuth('api666', gestion_get_secret()),
data = req)
return response.content
def setup_workdir():
if not os.path.isdir(WORKDIR):
os.mkdir(WORKDIR)
if not "impayé" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/impayé")
if not "expiré" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/expiré")
if not "validé" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/validé")
if not "transition" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/transition")
def sendmail(headers, data):
msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def sendmail_with_attachment(headers, data, attachment_header, attachment, ending):
msg = bytes(headers + "\n", 'utf-8') \
+ quopri.encodestring(bytes(data, 'utf-8')) \
+ bytes(attachment_header + "\n", 'utf-8') \
+ base64.b64encode(attachment) \
+ bytes(ending, 'utf-8')
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def gestion_get_expired():
request_expired = "SELECT id_user FROM services_users su " +\
"INNER JOIN membres m ON m.id = su.id_user " +\
"INNER JOIN services s ON s.id = su.id_service " +\
"LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\
"INNER JOIN (SELECT id, MAX(date) " +\
"FROM services_users " +\
"GROUP BY id_user, id_service) " +\
"AS su2 ON su2.id = su.id " +\
"WHERE su.id_service = 1 " +\
"AND su.expiry_date < date() " +\
"AND NOT (m.id_category = 10 " +\
"OR m.id_category = 3 " +\
"OR m.id_category = 2 " +\
"OR m.id_category = 8);"
expired_members_list = \
[ str(x["id_user"]) for x in gestion_read(request_expired)["results"]]
return expired_members_list.copy()
def gestion_get_unpaid():
request_unpaid = "SELECT id_user FROM services_users su " +\
"INNER JOIN membres m ON m.id = su.id_user " +\
"INNER JOIN services s ON s.id = su.id_service " +\
"LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\
"INNER JOIN (SELECT id, MAX(date) " +\
"FROM services_users " +\
"GROUP BY id_user, id_service) " +\
"AS su2 ON su2.id = su.id " +\
"WHERE su.id_service = 1 " +\
"AND su.paid = 0 " +\
"AND NOT (m.id_category = 10 " +\
"OR m.id_category = 3 " +\
"OR m.id_category = 2 " +\
"OR m.id_category = 8);"
unpaid_members_list = \
[ str(x["id_user"]) for x in gestion_read(request_unpaid)["results"]]
return unpaid_members_list.copy()
def gestion_get_amount(member):
request_unpaid = "SELECT amount FROM services_users su " +\
"INNER JOIN membres m ON m.id = su.id_user " +\
"INNER JOIN services s ON s.id = su.id_service " +\
"LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\
"INNER JOIN (SELECT id, MAX(date) " +\
"FROM services_users " +\
"GROUP BY id_user, id_service) " +\
"AS su2 ON su2.id = su.id " +\
"WHERE id_user = {};".format(member)
return gestion_read(request_unpaid)["results"][-1]["amount"]
def gestion_get_date(member):
request_unpaid = "SELECT date FROM services_users su " +\
"INNER JOIN membres m ON m.id = su.id_user " +\
"INNER JOIN services s ON s.id = su.id_service " +\
"LEFT JOIN services_fees sf ON sf.id = su.id_fee " +\
"INNER JOIN (SELECT id, MAX(date) " +\
"FROM services_users " +\
"GROUP BY id_user, id_service) " +\
"AS su2 ON su2.id = su.id " +\
"WHERE id_user = {};".format(member)
return gestion_read(request_unpaid)["results"][-1]["date"]
def get_member_infos(member):
request = "SELECT * FROM membres " +\
"WHERE id = '{}';".format(member)
try:
name = gestion_read(request)['results'][-1]['nom']
numero = gestion_read(request)['results'][-1]['numero']
email = gestion_read(request)['results'][-1]['email']
except:
print(gestion_read(request))
raise(Exception)
return (name, numero, email)
def check_email_created(member):
# Get member infos
name, numero, email = get_member_infos(member)
request = { 'not[labels]' : "Résolu",
'state' : "opened" }
answer = git_mail_ticket_read(request)
for ticket in answer:
if "{}.".format(numero) in ticket['title']:
return False
return True
def notify_expired(member):
BUF.append("* {}".format(member))
BUF.append(" EXPIRATION ADHESION")
BUF.append("")
def notify_unpaid(member):
if not check_email_created(member):
return False
# Get member infos
name, numero, email = get_member_infos(member)
amount = "{},{}".format(str(gestion_get_amount(member))[:-2],
str(gestion_get_amount(member))[-2:])
year = gestion_get_date(member)[:4]
BUF.append("* {} (numero {}), {}, {}".format(member, numero, name, amount))
BUF.append(" NOTIFICATION MEMBRE")
BUF.append("")
mailtext = get_file_content_all(MODALITY_MAIL) + "\n"
mailtext = mailtext.replace("NOM_COTISANT", name)
mailtext = mailtext.replace("NUMERO_MEMBRE", str(numero))
mailtext = mailtext.replace("MONTANT_COTISATION", amount)
mailtext = mailtext.replace("ANNEE_CIVILE", year)
mailheaders = get_file_content_all(MODALITY_MAIL_HEADERS) + "\n"
mailheaders = mailheaders.replace("ANNEE_CIVILE", year)
mailheaders = mailheaders.replace("COURRIEL-COTISANT", email)
sendmail(mailheaders, mailtext)
return True
def renotify_unpaid(member):
# Get member infos
name, numero, email = get_member_infos(member)
amount = "{},{}".format(str(gestion_get_amount(member))[:-2],
str(gestion_get_amount(member))[-2:])
year = gestion_get_date(member)[:4]
BUF.append("* {} (numero {}), {}, {}".format(member, numero, name, amount))
BUF.append(" RENOTIFICATION MEMBRE")
BUF.append("")
mailtext = get_file_content_all(MODALITY_MAIL) + "\n"
mailtext = mailtext.replace("NOM_COTISANT", name)
mailtext = mailtext.replace("NUMERO_MEMBRE", str(numero))
mailtext = mailtext.replace("MONTANT_COTISATION", amount)
mailtext = mailtext.replace("ANNEE_CIVILE", year)
mailheaders = get_file_content_all(MODALITY_MAIL_HEADERS) + "\n"
mailheaders = mailheaders.replace("ANNEE_CIVILE", year)
mailheaders = mailheaders.replace("COURRIEL-COTISANT", email)
mailheaders = mailheaders.replace("Modalit", "Rappel_:_modalit")
sendmail(mailheaders, mailtext)
def check_expired_unpaid():
expired_members = gestion_get_expired()
unpaid_members = gestion_get_unpaid()
# Check expired members
if set(expired_members) != set(os.listdir(WORKDIR+"/expiré")):
BUF.append("Membres expirés : {}\n".format(gestion_get_expired()))
# Check for no-more-expired members
for record in os.listdir(WORKDIR+"/expiré"):
if not record in expired_members:
os.rename(WORKDIR+"/expiré/"+record,
WORKDIR+"/transition/"+record)
# Check for new expired members
for member in expired_members:
if not str(member) in os.listdir(WORKDIR+"/expiré"):
set_file_content(WORKDIR+"/expiré/"+str(member), "")
notify_expired(member)
# Check unpaid members
if set(unpaid_members) != set(os.listdir(WORKDIR+"/impayé")):
BUF.append("Membres en impayé : {}\n".format(gestion_get_unpaid()))
# Check for no-more-unpaid members
for record in os.listdir(WORKDIR+"/impayé"):
if not record in unpaid_members:
os.rename(WORKDIR+"/impayé/"+record,
WORKDIR+"/transition/"+record)
# Check for new unpaid members and set date of first contact
for member in unpaid_members:
if not member in os.listdir(WORKDIR+"/impayé"):
# Get member infos
name, numero, email = get_member_infos(member)
if notify_unpaid(member):
set_file_content(WORKDIR+"/impayé/"+str(member),
str(datetime.datetime.now().strftime("%d/%m/%Y")))
else:
# Ancient date for the system to be triggered
set_file_content(WORKDIR+"/impayé/"+str(member),"27/09/1983")
BUF.append("* {} (numero {}), {}".format(member, numero, name))
BUF.append(" COURRIEL NON FONCTIONNEL")
# Check periodically for unpaid members
for record in os.listdir(WORKDIR+"/impayé"):
last_contact = \
datetime.datetime.strptime(
get_file_content(WORKDIR+"/impayé/"+record)[0].replace("\n", ""),
'%d/%m/%Y')
# Too old to be a relaunch
if abs(datetime.datetime.now() - last_contact).days > 300:
if notify_unpaid(record):
os.remove(WORKDIR+"/impayé/"+str(record))
set_file_content(WORKDIR+"/impayé/"+str(record),
str(datetime.datetime.now().strftime("%d/%m/%Y")))
# Relaunch
elif abs(datetime.datetime.now() - last_contact).days > 30:
renotify_unpaid(record)
os.remove(WORKDIR+"/impayé/"+str(record))
set_file_content(WORKDIR+"/impayé/"+str(record),
str(datetime.datetime.now().strftime("%d/%m/%Y")))
# Clean transited member if unpaid or expired
for record in os.listdir(WORKDIR+"/transition"):
if record in os.listdir(WORKDIR+"/impayé") or \
record in os.listdir(WORKDIR+"/expiré"):
os.remove(WORKDIR+"/transition/"+record)
def validate(member):
# Get infos
request = "SELECT *, l.reference true_reference " +\
"FROM services_users su " +\
"INNER JOIN membres m ON m.id = su.id_user " +\
"INNER JOIN services_fees sf ON sf.id = su.id_fee " +\
"LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id " +\
"LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction " +\
"INNER JOIN acc_transactions tr ON tr.id = l.id_transaction " +\
"WHERE m.id = {} AND l.id_account = 481;".format(member)
# Note: su.id_service = 1 parceque la cotisation correspond au service 1
try:
answer = gestion_read(request)["results"][-1]
except IndexError:
BUF.append("* {} (numero inconnu)".format(member))
BUF.append(" SUPPRESSION MEMBRE")
BUF.append("")
os.remove(WORKDIR+"/transition/"+member)
return
date = datetime.datetime.strptime(
answer["date"],'%Y-%m-%d').strftime("%d/%m/%Y")
filename = "{}_reçu_{}".format(
answer["nom"].replace(" ", "_"),
date.replace("/", "."))
BUF.append("* {} (numero {}), {}".format(answer["id"],
answer["numero"],
answer["nom"]))
BUF.append(" COTISATION : {},{}".format(str(answer["amount"])[:-2],
str(answer["amount"])[-2:]))
BUF.append(" ANNEE CIVILE : {}".format(answer["date"][:4]))
BUF.append(" VALIDATION MEMBRE")
BUF.append("")
# Generate receipt
latexfile = get_file_content_all("RECU_COTISATION.tex")
latexfile = latexfile.replace("ANNEE-CIVILE", answer["date"][:4])
latexfile = latexfile.replace("NOM-COTISANT", answer["nom"])
latexfile = latexfile.replace("STATUT-COTISANT", answer["statut_juridique"])
latexfile = latexfile.replace("ADRESSE-COTISANT", "{}, {} {}".format(
answer["adresse"],
answer["code_postal"],
answer["ville"]))
latexfile = latexfile.replace("SOMME", "{},{}".format(
str(answer["amount"])[:-2],
str(answer["amount"])[-2:]))
latexfile = latexfile.replace("DATE-VERSEMENT", date)
latexfile = latexfile.replace("MODE-VERSEMENT", answer["true_reference"])
try:
os.remove(WORKDIR+"/validé/"+filename+".tex")
except:
pass
set_file_content(WORKDIR+"/validé/"+filename+".tex", latexfile)
os.system("cd {} && pdflatex {}".format(WORKDIR+"/validé/", filename+".tex"))
# Preparing mail
mailheaders = get_file_content_all(RECEPT_MAIL_HEADERS).replace("COURRIEL-COTISANT",
answer["email"]) + "\n"
mailtext = get_file_content_all(RECEPT_MAIL).replace("ANNEE-CIVILE",
answer["date"][:4]) + "\n"
mailtattach = get_file_content_all(RECEPT_MAIL_ATTACHMENT) + "\n"
# Opening PDF file as binary
attachment = open(WORKDIR+"/validé/"+filename+".pdf", "rb")
data = attachment.read()
attachment.close()
ending = "--------------3yxkFgv0AINs5nd0i6BJrWaV--"
sendmail_with_attachment(mailheaders, mailtext, mailtattach, data, ending)
# The end
os.remove(WORKDIR+"/transition/"+member)
def validate_members():
for record in os.listdir(WORKDIR+"/transition"):
validate(record)
def main():
setup_workdir()
check_expired_unpaid()
validate_members()
# End of work
# Launch summary mail
mailheaders = get_file_content_all(SUMMARY_MAIL) + "\n"
mailtext = ""
is_sendable = False;
for line in BUF:
mailtext += line + "\n"
is_sendable = True;
print(line)
if is_sendable:
sendmail(mailheaders, mailtext)
## Bootstrap
if __name__ == '__main__':
main()