Compare commits

...

3 Commits

3 changed files with 233 additions and 167 deletions

View File

@ -1,7 +1,7 @@
#!/usr/bin/python3
# Libre en Communs's adhesion control program
# Copyright (C) 2022 Libre en Communs
# Copyright (C) 2022-2024 Libre en Communs
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@ -22,6 +22,9 @@ from typing import List
from requests.auth import HTTPBasicAuth
from requests.auth import HTTPDigestAuth
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
VERSION="1.0.0"
GESTION_SECRET_FILE="/home/secretaire/.secret/gestion_api_password"
@ -34,10 +37,27 @@ SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location
BUF=[]
GITEA_URL = "https://forge.a-lec.org"
# Update these with your SMTP server details
SMTP_SERVER = 'mail.a-lec.org'
SMTP_PORT = 587
SMTP_USER = 'secretaire@a-lec.org'
SMTP_SECRET_FILE = "/home/secretaire/.secret/smtp_api_password"
# gestion_read("SELECT * FROM services_users su \
# INNER JOIN membres m ON m.id = su.id_user \
# INNER JOIN services_fees sf ON sf.id = su.id_fee \
# LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id \
# LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction \
# WHERE m.id = 3 AND l.id_account = 481;")
def gestion_get_secret():
with open(GESTION_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def smtp_get_secret():
with open(SMTP_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def git_get_secret():
with open(GIT_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
@ -86,10 +106,31 @@ def setup_workdir():
if not "refuse" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/refuse")
# def sendmail(headers, data):
# print("***\nHEADERS: {}\n***\n".format(headers))
# msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
# subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def sendmail(headers, data):
print("***\nHEADERS: {}\n***\n".format(headers))
msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
# Prepare the email
msg = MIMEMultipart()
for header in headers.split("\n"):
if ": " in header:
key, value = header.split(": ", 1)
msg[key] = value
msg.attach(MIMEText(data, 'plain'))
# Connect to the SMTP server and send the email
try:
server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
server.starttls() # Upgrade the connection to a secure TLS connection
server.login(SMTP_USER, smtp_get_secret())
server.sendmail(SMTP_USER, msg['To'], msg.as_string())
server.quit()
#print("Email sent successfully")
except Exception as e:
print(f"Failed to send email: {e}")
def new_registrations_worker():
if (os.listdir(WORKDIR+"/nouveau") == []):

View File

@ -1,7 +1,7 @@
#!/usr/bin/python3
# Libre en Communs's cotisation control program
# Copyright (C) 2022 Libre en Communs
# Copyright (C) 2022-2024 Libre en Communs
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@ -19,7 +19,13 @@
import os, requests, json, datetime, shutil, quopri, subprocess, base64
from requests.auth import HTTPBasicAuth
from requests.auth import HTTPDigestAuth
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from email.utils import formataddr
from pathlib import Path
VERSION="1.0.0"
GESTION_SECRET_FILE="/home/tresorier/.secret/gestion_api_password"
@ -35,6 +41,12 @@ SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location
BUF=[]
GITEA_URL = "https://forge.a-lec.org"
# Update these with your SMTP server details
SMTP_SERVER = 'mail.a-lec.org'
SMTP_PORT = 587
SMTP_USER = 'tresorier@a-lec.org'
SMTP_SECRET_FILE = "/home/tresorier/.secret/smtp_api_password"
# gestion_read("SELECT * FROM services_users su \
# INNER JOIN membres m ON m.id = su.id_user \
# INNER JOIN services_fees sf ON sf.id = su.id_fee \
@ -46,6 +58,10 @@ def gestion_get_secret():
with open(GESTION_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def smtp_get_secret():
with open(SMTP_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def git_get_secret():
with open(GIT_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
@ -84,18 +100,48 @@ def setup_workdir():
if not "transition" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/transition")
def sendmail(headers, data):
msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
# def sendmail(headers, data):
# msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
# subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def sendmail_with_attachment(headers, data, attachment_header, attachment, ending):
msg = bytes(headers + "\n", 'utf-8') \
+ quopri.encodestring(bytes(data, 'utf-8')) \
+ bytes(attachment_header + "\n", 'utf-8') \
+ base64.b64encode(attachment) \
+ bytes(ending, 'utf-8')
# def sendmail_with_attachment(headers, data, attachment_header, attachment, ending):
# msg = bytes(headers + "\n", 'utf-8') \
# + quopri.encodestring(bytes(data, 'utf-8')) \
# + bytes(attachment_header + "\n", 'utf-8') \
# + base64.b64encode(attachment) \
# + bytes(ending, 'utf-8')
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
# subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def sendmail_with_attachment(headers, data, attachment_path, filename):
# Parse headers
msg = MIMEMultipart()
for header in headers.split("\n"):
if ": " in header:
key, value = header.split(": ", 1)
msg[key] = value
# Add the email body
body = MIMEText(data, 'plain', 'utf-8')
msg.attach(body)
# Add the attachment
attachment = MIMEBase('application', 'octet-stream')
attachment.set_payload(Path(attachment_path).read_bytes())
encoders.encode_base64(attachment)
attachment.add_header('Content-Disposition', f'attachment; filename={filename}')
msg.attach(attachment)
# Send the email
try:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls() # Start TLS encryption
server.login(SMTP_USER, smtp_get_secret())
server.sendmail(SMTP_USER, msg['To'], msg.as_string())
print("Email sent successfully")
except Exception as e:
print(f"Failed to send email: {e}")
def gestion_get_expired():
request_expired = "SELECT id_user FROM services_users su " +\
@ -396,23 +442,15 @@ def validate(member):
set_file_content(WORKDIR+"/validé/"+filename+".tex", latexfile)
os.system("cd {} && pdflatex {}".format(WORKDIR+"/validé/", filename+".tex"))
# Preparing mail
mailheaders = get_file_content_all(RECEPT_MAIL_HEADERS).replace("COURRIEL-COTISANT",
answer["email"]) + "\n"
mailtext = get_file_content_all(RECEPT_MAIL).replace("ANNEE-CIVILE",
answer["date"][:4]) + "\n"
mailtattach = get_file_content_all(RECEPT_MAIL_ATTACHMENT) + "\n"
# Preparing the email
mailheaders = get_file_content_all(RECEPT_MAIL_HEADERS).replace("COURRIEL-COTISANT", answer["email"]) + "\n"
mailtext = get_file_content_all(RECEPT_MAIL).replace("ANNEE-CIVILE", answer["date"][:4]) + "\n"
# Opening PDF file as binary
attachment = open(WORKDIR+"/validé/"+filename+".pdf", "rb")
data = attachment.read()
attachment.close()
# Sending the email with the attached PDF
pdf_path = WORKDIR+"/validé/"+filename+".pdf"
sendmail_with_attachment(mailheaders, mailtext, pdf_path, filename + ".pdf")
ending = "--------------3yxkFgv0AINs5nd0i6BJrWaV--"
sendmail_with_attachment(mailheaders, mailtext, mailtattach, data, ending)
# The end
# Cleanup
os.remove(WORKDIR+"/transition/"+member)
def validate_members():

View File

@ -1,7 +1,7 @@
#!/usr/bin/python3
# Libre en Communs's mecene control program
# Copyright (C) 2022 Libre en Communs
# Copyright (C) 2022-2024 Libre en Communs
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@ -16,33 +16,42 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os, requests, json, datetime, shutil, quopri, subprocess, base64
import os
import requests
import datetime
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from requests.auth import HTTPBasicAuth
from requests.auth import HTTPDigestAuth
VERSION = "1.0.0"
GESTION_SECRET_FILE = "/home/tresorier/.secret/gestion_api_password"
WORKDIR = "/srv/validation_don.d"
MODALITY_MAIL = "mail_instructions.txt"
MODALITY_MAIL_HEADERS = "mail_instructions_headers.txt"
RECEPT_MAIL = "mail_recu.txt"
RECEPT_MAIL_HEADERS = "mail_recu_headers.txt"
RECEPT_MAIL_ATTACHMENT = "mail_recu_attachment.txt"
SUMMARY_MAIL = "mail_recap_header.txt"
VERSION="1.0.0"
GESTION_SECRET_FILE="/home/tresorier/.secret/gestion_api_password"
WORKDIR="/srv/validation_don.d"
MODALITY_MAIL="mail_instructions.txt"
MODALITY_MAIL_HEADERS="mail_instructions_headers.txt"
RECEPT_MAIL="mail_recu.txt"
RECEPT_MAIL_HEADERS="mail_recu_headers.txt"
RECEPT_MAIL_ATTACHMENT="mail_recu_attachment.txt"
SUMMARY_MAIL="mail_recap_header.txt"
SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location
BUF=[]
# SMTP settings (adjust as necessary)
SMTP_SERVER = 'mail.a-lec.org'
SMTP_PORT = 587
SMTP_USER = 'tresorier@a-lec.org'
SMTP_SECRET_FILE = "/home/tresorier/.secret/smtp_api_password"
# gestion_read("SELECT * FROM services_users su \
# INNER JOIN membres m ON m.id = su.id_user \
# INNER JOIN services_fees sf ON sf.id = su.id_fee \
# LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id \
# LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction \
# WHERE m.id = 3 AND l.id_account = 481;")
BUF = []
# Utility functions
def gestion_get_secret():
with open(GESTION_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
return sfile.readline().strip()
def smtp_get_secret():
with open(SMTP_SECRET_FILE) as sfile:
return sfile.readline().strip()
def get_file_content(filename):
with open(filename) as sfile:
@ -58,90 +67,95 @@ def set_file_content(filename, lines):
def gestion_read(req):
response = requests.post('https://gestion.a-lec.org/api/sql/',
auth = HTTPBasicAuth('api666', gestion_get_secret()),
data = req)
auth=HTTPBasicAuth('api666', gestion_get_secret()),
data=req)
return response.json()
def setup_workdir():
if not os.path.isdir(WORKDIR):
os.mkdir(WORKDIR)
if not "transition" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/transition")
if not "nouveau" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/nouveau")
if not "validé" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/validé")
for subdir in ['transition', 'nouveau', 'validé']:
path = os.path.join(WORKDIR, subdir)
if not os.path.exists(path):
os.makedirs(path)
def sendmail(headers, data):
msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
# Send email with attachment via SMTP, password fetched from file
def sendmail_with_attachment(headers, body, attachment_path, attachment_filename):
msg = MIMEMultipart()
def sendmail_with_attachment(headers, data, attachment_header, attachment, ending):
msg = bytes(headers + "\n", 'utf-8') \
+ quopri.encodestring(bytes(data, 'utf-8')) \
+ bytes(attachment_header + "\n", 'utf-8') \
+ base64.b64encode(attachment) \
+ bytes(ending, 'utf-8')
# Add headers
for header in headers.split("\n"):
if ": " in header:
key, value = header.split(": ", 1)
msg[key] = value.strip()
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
# Email body
msg.attach(MIMEText(body, 'plain', 'utf-8'))
# Attach PDF file
if attachment_path:
with open(attachment_path, "rb") as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename={attachment_filename}')
msg.attach(part)
# Send email
try:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls() # Secure the connection
server.login(SMTP_USER, smtp_get_secret()) # Fetch SMTP password from file
server.sendmail(SMTP_USER, msg['To'], msg.as_string())
#print(f"Email sent to {msg['To']}")
except Exception as e:
print(f"Error sending email: {e}")
# Notify unpaid donors
def notify_unpaid(record):
# Check if notified
content = get_file_content_all(WORKDIR+"/transition/"+record).split("\n")
content = get_file_content_all(os.path.join(WORKDIR, "transition", record)).split("\n")
if len(content) > 8 and "notified" in content[8]:
return
return # Already notified
# Get infos
name, surname, address, postal_code, city, email, amount, mode = \
get_file_content_all(WORKDIR+"/transition/"+record).split("\n")[:8]
# Parse donor information
name, surname, address, postal_code, city, email, amount, mode = content[:8]
BUF.append("* {} {}, {}".format(record, name, amount))
BUF.append(f"* {record} {name}, {amount}")
BUF.append(" NOTIFICATION DONATEUR")
BUF.append("")
mailtext = get_file_content_all(MODALITY_MAIL) + "\n"
mailtext = mailtext.replace("NOM_DONNEUR", surname+" "+name)
mailtext = get_file_content_all(MODALITY_MAIL)
mailtext = mailtext.replace("NOM_DONNEUR", f"{surname} {name}")
mailtext = mailtext.replace("NUMERO_DON", str(record))
mailtext = mailtext.replace("MONTANT_DON", amount)
mailtext = mailtext.replace("MODE_DON", mode)
mailheaders = get_file_content_all(MODALITY_MAIL_HEADERS) + "\n"
mailheaders = get_file_content_all(MODALITY_MAIL_HEADERS)
mailheaders = mailheaders.replace("COURRIEL_DONNEUR", email)
# Notify
sendmail(mailheaders, mailtext)
sendmail_with_attachment(mailheaders, mailtext, None, None)
# Indicate as notified
with open(WORKDIR+"/transition/"+record, "a") as sfile:
sfile.write("notified")
# Mark as notified
with open(os.path.join(WORKDIR, "transition", record), "a") as sfile:
sfile.write("notified\n")
# Validate donation records
def validate(record):
# Get infos
request = "SELECT * FROM acc_transactions tr " +\
"INNER JOIN acc_transactions_lines l " +\
" ON tr.id = l.id_transaction " +\
"WHERE tr.notes LIKE '%{}%' and id_account = 469".format(record)
# Note: su.id_service = 1 parceque la cotisation correspond au service 1
request = f"SELECT * FROM acc_transactions tr INNER JOIN acc_transactions_lines l " \
f"ON tr.id = l.id_transaction WHERE tr.notes LIKE '%{record}%' AND id_account = 469"
try:
answer = gestion_read(request)["results"][-1]
except:
return False
except IndexError:
return False # If no result, donation not validated
# Parse donor information
name, surname, address, postal_code, city, email, amount, mode = \
get_file_content_all(WORKDIR+"/transition/"+record).split("\n")[:8]
get_file_content_all(os.path.join(WORKDIR, "transition", record)).split("\n")[:8]
date = datetime.datetime.strptime(
answer["date"],'%Y-%m-%d').strftime("%d/%m/%Y")
date = datetime.datetime.strptime(answer["date"], '%Y-%m-%d').strftime("%d/%m/%Y")
filename = f"{name}_reçu_{record}_{date.replace('/', '.')}"
filename = "{}_reçu_{}_{}".format(
name,
record,
date.replace("/", "."))
BUF.append("* {} {}, {}".format(record, name, "{},{}".format(
str(answer["credit"])[:-2],
str(answer["credit"])[-2:])))
BUF.append(f"* {record} {name}, {amount}")
BUF.append(" VALIDATION DON")
BUF.append("")
@ -150,95 +164,68 @@ def validate(record):
latexfile = latexfile.replace("NUMERO-DON", record)
latexfile = latexfile.replace("ANNEE-CIVILE", answer["date"][:4])
latexfile = latexfile.replace("NOM-DONATEUR", name)
latexfile = latexfile.replace("STATUT-DONATEUR", "Personne physique") # XXX
latexfile = latexfile.replace("ADRESSE-DONATEUR", "{}, {} {}".format(
address,
postal_code,
city))
latexfile = latexfile.replace("SOMME", "{},{}".format(
str(answer["credit"])[:-2],
str(answer["credit"])[-2:]))
latexfile = latexfile.replace("STATUT-DONATEUR", "Personne physique")
latexfile = latexfile.replace("ADRESSE-DONATEUR", f"{address}, {postal_code} {city}")
latexfile = latexfile.replace("SOMME", f"{str(answer['credit'])[:-2]},{str(answer['credit'])[-2:]}")
latexfile = latexfile.replace("DATE-VERSEMENT", date)
latexfile = latexfile.replace("MODE-VERSEMENT", answer["reference"])
latexfile = latexfile.replace("FORME-DON", "Déclaration de don manuel") # XXX
latexfile = latexfile.replace("NATURE-DON", "Numéraire") # XXX
latexfile = latexfile.replace("FORME-DON", "Déclaration de don manuel")
latexfile = latexfile.replace("NATURE-DON", "Numéraire")
try:
os.remove(WORKDIR+"/validé/"+filename+".tex")
except:
pass
set_file_content(WORKDIR+"/validé/"+filename+".tex", latexfile)
os.system("cd {} && pdflatex {}".format(WORKDIR+"/validé/", filename+".tex"))
# Save the LaTeX file and generate the PDF
tex_file_path = os.path.join(WORKDIR, "validé", f"{filename}.tex")
set_file_content(tex_file_path, latexfile)
os.system(f"cd {WORKDIR}/validé/ && pdflatex {filename}.tex")
# Preparing mail
mailheaders = get_file_content_all(RECEPT_MAIL_HEADERS).replace(
"COURRIEL-DON",
email) + "\n"
mailtext = get_file_content_all(RECEPT_MAIL).replace("DATE-DON", date) + "\n"
mailtattach = get_file_content_all(RECEPT_MAIL_ATTACHMENT).replace(
"DATE-DON",
record + "_" + date.replace("/", ".")) + "\n"
# Prepare and send email with attachment
mailheaders = get_file_content_all(RECEPT_MAIL_HEADERS).replace("COURRIEL-DON", email)
mailtext = get_file_content_all(RECEPT_MAIL).replace("DATE-DON", date)
mailtattach = get_file_content_all(RECEPT_MAIL_ATTACHMENT).replace("DATE-DON", f"{record}_{date.replace('/', '.')}")
pdf_path = os.path.join(WORKDIR, "validé", f"{filename}.pdf")
# Opening PDF file as binary
attachment = open(WORKDIR+"/validé/"+filename+".pdf", "rb")
data = attachment.read()
attachment.close()
ending = "--------------3yxkFgv0AINs5nd0i6BJrWaV--"
sendmail_with_attachment(mailheaders, mailtext, mailtattach, data, ending)
# The end
os.remove(WORKDIR+"/transition/"+record)
sendmail_with_attachment(mailheaders, mailtext, pdf_path, f"{filename}.pdf")
# Clean up
os.remove(os.path.join(WORKDIR, "transition", record))
return True
# Process new donation records
def check_record(intent):
numero, content = get_file_content_all(intent).split("|")
name, surname, address, postal_code, city, email, amount, mode = \
content.split(";")[:8]
name, surname, address, postal_code, city, email, amount, mode = content.split(";")[:8]
BUF.append("* {} {}, {}".format(numero, name+" "+surname, amount))
BUF.append(f"* {numero} {name} {surname}, {amount}")
BUF.append(" NOUVEAU DON")
BUF.append("")
lines = [ "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n".format(
name, surname, address, postal_code, city, email, amount, mode
) ]
set_file_content(WORKDIR+"/transition/"+numero, lines)
if numero in os.listdir(WORKDIR+"/transition/"):
os.remove(intent)
lines = [f"{name}\n{surname}\n{address}\n{postal_code}\n{city}\n{email}\n{amount}\n{mode}\n"]
set_file_content(os.path.join(WORKDIR, "transition", numero), lines)
os.remove(intent) # Clean up processed intent file
# Main donation validation workflow
def validate_donors():
# Process new donation intents
for new_intent in os.listdir(os.path.join(WORKDIR, "nouveau")):
check_record(os.path.join(WORKDIR, "nouveau", new_intent))
# Get new
for new_intent in os.listdir(WORKDIR+"/nouveau"):
check_record(WORKDIR+"/nouveau/"+new_intent)
# Validate record
for record in os.listdir(WORKDIR+"/transition"):
# Validate existing records or notify unpaid
for record in os.listdir(os.path.join(WORKDIR, "transition")):
if not validate(record):
notify_unpaid(record)
# Main function
def main():
setup_workdir()
#GET DATA()
validate_donors()
# End of work
# Launch summary mail
# Send summary email
mailheaders = get_file_content_all(SUMMARY_MAIL) + "\n"
mailtext = ""
is_sendable = False;
for line in BUF:
mailtext += line + "\n"
is_sendable = True;
print(line)
if BUF:
mailtext = "\n".join(BUF)
sendmail_with_attachment(mailheaders, mailtext, None, None)
if is_sendable:
sendmail(mailheaders, mailtext)
## Bootstrap
if __name__ == '__main__':
main()