Compare commits

..

No commits in common. "bf39e8a8b9a8922ef1e20d14265a1590eeccac65" and "43774b4af7f71695cd08e6cbe7778accf3e16392" have entirely different histories.

3 changed files with 167 additions and 233 deletions

View File

@ -1,7 +1,7 @@
#!/usr/bin/python3
# Libre en Communs's adhesion control program
# Copyright (C) 2022-2024 Libre en Communs
# Copyright (C) 2022 Libre en Communs
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@ -22,9 +22,6 @@ from typing import List
from requests.auth import HTTPBasicAuth
from requests.auth import HTTPDigestAuth
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
VERSION="1.0.0"
GESTION_SECRET_FILE="/home/secretaire/.secret/gestion_api_password"
@ -37,27 +34,10 @@ SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location
BUF=[]
GITEA_URL = "https://forge.a-lec.org"
# Update these with your SMTP server details
SMTP_SERVER = 'mail.a-lec.org'
SMTP_PORT = 587
SMTP_USER = 'secretaire@a-lec.org'
SMTP_SECRET_FILE = "/home/secretaire/.secret/smtp_api_password"
# gestion_read("SELECT * FROM services_users su \
# INNER JOIN membres m ON m.id = su.id_user \
# INNER JOIN services_fees sf ON sf.id = su.id_fee \
# LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id \
# LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction \
# WHERE m.id = 3 AND l.id_account = 481;")
def gestion_get_secret():
with open(GESTION_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def smtp_get_secret():
with open(SMTP_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def git_get_secret():
with open(GIT_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
@ -106,31 +86,10 @@ def setup_workdir():
if not "refuse" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/refuse")
# def sendmail(headers, data):
# print("***\nHEADERS: {}\n***\n".format(headers))
# msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
# subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def sendmail(headers, data):
print("***\nHEADERS: {}\n***\n".format(headers))
# Prepare the email
msg = MIMEMultipart()
for header in headers.split("\n"):
if ": " in header:
key, value = header.split(": ", 1)
msg[key] = value
msg.attach(MIMEText(data, 'plain'))
# Connect to the SMTP server and send the email
try:
server = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
server.starttls() # Upgrade the connection to a secure TLS connection
server.login(SMTP_USER, smtp_get_secret())
server.sendmail(SMTP_USER, msg['To'], msg.as_string())
server.quit()
#print("Email sent successfully")
except Exception as e:
print(f"Failed to send email: {e}")
msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def new_registrations_worker():
if (os.listdir(WORKDIR+"/nouveau") == []):

View File

@ -1,7 +1,7 @@
#!/usr/bin/python3
# Libre en Communs's cotisation control program
# Copyright (C) 2022-2024 Libre en Communs
# Copyright (C) 2022 Libre en Communs
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@ -19,13 +19,7 @@
import os, requests, json, datetime, shutil, quopri, subprocess, base64
from requests.auth import HTTPBasicAuth
from requests.auth import HTTPDigestAuth
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
from email.utils import formataddr
from pathlib import Path
VERSION="1.0.0"
GESTION_SECRET_FILE="/home/tresorier/.secret/gestion_api_password"
@ -41,12 +35,6 @@ SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location
BUF=[]
GITEA_URL = "https://forge.a-lec.org"
# Update these with your SMTP server details
SMTP_SERVER = 'mail.a-lec.org'
SMTP_PORT = 587
SMTP_USER = 'tresorier@a-lec.org'
SMTP_SECRET_FILE = "/home/tresorier/.secret/smtp_api_password"
# gestion_read("SELECT * FROM services_users su \
# INNER JOIN membres m ON m.id = su.id_user \
# INNER JOIN services_fees sf ON sf.id = su.id_fee \
@ -58,10 +46,6 @@ def gestion_get_secret():
with open(GESTION_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def smtp_get_secret():
with open(SMTP_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
def git_get_secret():
with open(GIT_SECRET_FILE) as sfile:
return sfile.readline().replace("\n", "")
@ -100,48 +84,18 @@ def setup_workdir():
if not "transition" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/transition")
# def sendmail(headers, data):
# msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
# subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def sendmail(headers, data):
msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
# def sendmail_with_attachment(headers, data, attachment_header, attachment, ending):
# msg = bytes(headers + "\n", 'utf-8') \
# + quopri.encodestring(bytes(data, 'utf-8')) \
# + bytes(attachment_header + "\n", 'utf-8') \
# + base64.b64encode(attachment) \
# + bytes(ending, 'utf-8')
def sendmail_with_attachment(headers, data, attachment_header, attachment, ending):
msg = bytes(headers + "\n", 'utf-8') \
+ quopri.encodestring(bytes(data, 'utf-8')) \
+ bytes(attachment_header + "\n", 'utf-8') \
+ base64.b64encode(attachment) \
+ bytes(ending, 'utf-8')
# subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def sendmail_with_attachment(headers, data, attachment_path, filename):
# Parse headers
msg = MIMEMultipart()
for header in headers.split("\n"):
if ": " in header:
key, value = header.split(": ", 1)
msg[key] = value
# Add the email body
body = MIMEText(data, 'plain', 'utf-8')
msg.attach(body)
# Add the attachment
attachment = MIMEBase('application', 'octet-stream')
attachment.set_payload(Path(attachment_path).read_bytes())
encoders.encode_base64(attachment)
attachment.add_header('Content-Disposition', f'attachment; filename={filename}')
msg.attach(attachment)
# Send the email
try:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls() # Start TLS encryption
server.login(SMTP_USER, smtp_get_secret())
server.sendmail(SMTP_USER, msg['To'], msg.as_string())
print("Email sent successfully")
except Exception as e:
print(f"Failed to send email: {e}")
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
def gestion_get_expired():
request_expired = "SELECT id_user FROM services_users su " +\
@ -442,15 +396,23 @@ def validate(member):
set_file_content(WORKDIR+"/validé/"+filename+".tex", latexfile)
os.system("cd {} && pdflatex {}".format(WORKDIR+"/validé/", filename+".tex"))
# Preparing the email
mailheaders = get_file_content_all(RECEPT_MAIL_HEADERS).replace("COURRIEL-COTISANT", answer["email"]) + "\n"
mailtext = get_file_content_all(RECEPT_MAIL).replace("ANNEE-CIVILE", answer["date"][:4]) + "\n"
# Preparing mail
mailheaders = get_file_content_all(RECEPT_MAIL_HEADERS).replace("COURRIEL-COTISANT",
answer["email"]) + "\n"
mailtext = get_file_content_all(RECEPT_MAIL).replace("ANNEE-CIVILE",
answer["date"][:4]) + "\n"
mailtattach = get_file_content_all(RECEPT_MAIL_ATTACHMENT) + "\n"
# Sending the email with the attached PDF
pdf_path = WORKDIR+"/validé/"+filename+".pdf"
sendmail_with_attachment(mailheaders, mailtext, pdf_path, filename + ".pdf")
# Opening PDF file as binary
attachment = open(WORKDIR+"/validé/"+filename+".pdf", "rb")
data = attachment.read()
attachment.close()
# Cleanup
ending = "--------------3yxkFgv0AINs5nd0i6BJrWaV--"
sendmail_with_attachment(mailheaders, mailtext, mailtattach, data, ending)
# The end
os.remove(WORKDIR+"/transition/"+member)
def validate_members():

View File

@ -1,7 +1,7 @@
#!/usr/bin/python3
# Libre en Communs's mecene control program
# Copyright (C) 2022-2024 Libre en Communs
# Copyright (C) 2022 Libre en Communs
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@ -16,42 +16,33 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import requests
import datetime
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import os, requests, json, datetime, shutil, quopri, subprocess, base64
from requests.auth import HTTPBasicAuth
from requests.auth import HTTPDigestAuth
VERSION = "1.0.0"
GESTION_SECRET_FILE = "/home/tresorier/.secret/gestion_api_password"
WORKDIR = "/srv/validation_don.d"
MODALITY_MAIL = "mail_instructions.txt"
MODALITY_MAIL_HEADERS = "mail_instructions_headers.txt"
RECEPT_MAIL = "mail_recu.txt"
RECEPT_MAIL_HEADERS = "mail_recu_headers.txt"
RECEPT_MAIL_ATTACHMENT = "mail_recu_attachment.txt"
SUMMARY_MAIL = "mail_recap_header.txt"
# SMTP settings (adjust as necessary)
SMTP_SERVER = 'mail.a-lec.org'
SMTP_PORT = 587
SMTP_USER = 'tresorier@a-lec.org'
SMTP_SECRET_FILE = "/home/tresorier/.secret/smtp_api_password"
VERSION="1.0.0"
GESTION_SECRET_FILE="/home/tresorier/.secret/gestion_api_password"
WORKDIR="/srv/validation_don.d"
MODALITY_MAIL="mail_instructions.txt"
MODALITY_MAIL_HEADERS="mail_instructions_headers.txt"
RECEPT_MAIL="mail_recu.txt"
RECEPT_MAIL_HEADERS="mail_recu_headers.txt"
RECEPT_MAIL_ATTACHMENT="mail_recu_attachment.txt"
SUMMARY_MAIL="mail_recap_header.txt"
SENDMAIL_LOCATION = "/usr/sbin/sendmail" # sendmail location
BUF=[]
BUF = []
# gestion_read("SELECT * FROM services_users su \
# INNER JOIN membres m ON m.id = su.id_user \
# INNER JOIN services_fees sf ON sf.id = su.id_fee \
# LEFT JOIN acc_transactions_users tu ON tu.id_service_user = su.id \
# LEFT JOIN acc_transactions_lines l ON l.id_transaction = tu.id_transaction \
# WHERE m.id = 3 AND l.id_account = 481;")
# Utility functions
def gestion_get_secret():
with open(GESTION_SECRET_FILE) as sfile:
return sfile.readline().strip()
def smtp_get_secret():
with open(SMTP_SECRET_FILE) as sfile:
return sfile.readline().strip()
return sfile.readline().replace("\n", "")
def get_file_content(filename):
with open(filename) as sfile:
@ -67,95 +58,90 @@ def set_file_content(filename, lines):
def gestion_read(req):
response = requests.post('https://gestion.a-lec.org/api/sql/',
auth=HTTPBasicAuth('api666', gestion_get_secret()),
data=req)
auth = HTTPBasicAuth('api666', gestion_get_secret()),
data = req)
return response.json()
def setup_workdir():
for subdir in ['transition', 'nouveau', 'validé']:
path = os.path.join(WORKDIR, subdir)
if not os.path.exists(path):
os.makedirs(path)
if not os.path.isdir(WORKDIR):
os.mkdir(WORKDIR)
if not "transition" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/transition")
if not "nouveau" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/nouveau")
if not "validé" in os.listdir(WORKDIR):
os.mkdir(WORKDIR+"/validé")
# Send email with attachment via SMTP, password fetched from file
def sendmail_with_attachment(headers, body, attachment_path, attachment_filename):
msg = MIMEMultipart()
def sendmail(headers, data):
msg = bytes(headers + "\n", 'utf-8') + quopri.encodestring(bytes(data, 'utf-8'))
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
# Add headers
for header in headers.split("\n"):
if ": " in header:
key, value = header.split(": ", 1)
msg[key] = value.strip()
def sendmail_with_attachment(headers, data, attachment_header, attachment, ending):
msg = bytes(headers + "\n", 'utf-8') \
+ quopri.encodestring(bytes(data, 'utf-8')) \
+ bytes(attachment_header + "\n", 'utf-8') \
+ base64.b64encode(attachment) \
+ bytes(ending, 'utf-8')
# Email body
msg.attach(MIMEText(body, 'plain', 'utf-8'))
subprocess.run([SENDMAIL_LOCATION, "-t", "-oi"], input=msg)
# Attach PDF file
if attachment_path:
with open(attachment_path, "rb") as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename={attachment_filename}')
msg.attach(part)
# Send email
try:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls() # Secure the connection
server.login(SMTP_USER, smtp_get_secret()) # Fetch SMTP password from file
server.sendmail(SMTP_USER, msg['To'], msg.as_string())
#print(f"Email sent to {msg['To']}")
except Exception as e:
print(f"Error sending email: {e}")
# Notify unpaid donors
def notify_unpaid(record):
content = get_file_content_all(os.path.join(WORKDIR, "transition", record)).split("\n")
# Check if notified
content = get_file_content_all(WORKDIR+"/transition/"+record).split("\n")
if len(content) > 8 and "notified" in content[8]:
return # Already notified
return
# Parse donor information
name, surname, address, postal_code, city, email, amount, mode = content[:8]
# Get infos
name, surname, address, postal_code, city, email, amount, mode = \
get_file_content_all(WORKDIR+"/transition/"+record).split("\n")[:8]
BUF.append(f"* {record} {name}, {amount}")
BUF.append("* {} {}, {}".format(record, name, amount))
BUF.append(" NOTIFICATION DONATEUR")
BUF.append("")
mailtext = get_file_content_all(MODALITY_MAIL)
mailtext = mailtext.replace("NOM_DONNEUR", f"{surname} {name}")
mailtext = get_file_content_all(MODALITY_MAIL) + "\n"
mailtext = mailtext.replace("NOM_DONNEUR", surname+" "+name)
mailtext = mailtext.replace("NUMERO_DON", str(record))
mailtext = mailtext.replace("MONTANT_DON", amount)
mailtext = mailtext.replace("MODE_DON", mode)
mailheaders = get_file_content_all(MODALITY_MAIL_HEADERS)
mailheaders = get_file_content_all(MODALITY_MAIL_HEADERS) + "\n"
mailheaders = mailheaders.replace("COURRIEL_DONNEUR", email)
# Notify
sendmail_with_attachment(mailheaders, mailtext, None, None)
sendmail(mailheaders, mailtext)
# Mark as notified
with open(os.path.join(WORKDIR, "transition", record), "a") as sfile:
sfile.write("notified\n")
# Indicate as notified
with open(WORKDIR+"/transition/"+record, "a") as sfile:
sfile.write("notified")
# Validate donation records
def validate(record):
request = f"SELECT * FROM acc_transactions tr INNER JOIN acc_transactions_lines l " \
f"ON tr.id = l.id_transaction WHERE tr.notes LIKE '%{record}%' AND id_account = 469"
# Get infos
request = "SELECT * FROM acc_transactions tr " +\
"INNER JOIN acc_transactions_lines l " +\
" ON tr.id = l.id_transaction " +\
"WHERE tr.notes LIKE '%{}%' and id_account = 469".format(record)
# Note: su.id_service = 1 parceque la cotisation correspond au service 1
try:
answer = gestion_read(request)["results"][-1]
except IndexError:
return False # If no result, donation not validated
except:
return False
# Parse donor information
name, surname, address, postal_code, city, email, amount, mode = \
get_file_content_all(os.path.join(WORKDIR, "transition", record)).split("\n")[:8]
get_file_content_all(WORKDIR+"/transition/"+record).split("\n")[:8]
date = datetime.datetime.strptime(answer["date"], '%Y-%m-%d').strftime("%d/%m/%Y")
filename = f"{name}_reçu_{record}_{date.replace('/', '.')}"
date = datetime.datetime.strptime(
answer["date"],'%Y-%m-%d').strftime("%d/%m/%Y")
BUF.append(f"* {record} {name}, {amount}")
filename = "{}_reçu_{}_{}".format(
name,
record,
date.replace("/", "."))
BUF.append("* {} {}, {}".format(record, name, "{},{}".format(
str(answer["credit"])[:-2],
str(answer["credit"])[-2:])))
BUF.append(" VALIDATION DON")
BUF.append("")
@ -164,68 +150,95 @@ def validate(record):
latexfile = latexfile.replace("NUMERO-DON", record)
latexfile = latexfile.replace("ANNEE-CIVILE", answer["date"][:4])
latexfile = latexfile.replace("NOM-DONATEUR", name)
latexfile = latexfile.replace("STATUT-DONATEUR", "Personne physique")
latexfile = latexfile.replace("ADRESSE-DONATEUR", f"{address}, {postal_code} {city}")
latexfile = latexfile.replace("SOMME", f"{str(answer['credit'])[:-2]},{str(answer['credit'])[-2:]}")
latexfile = latexfile.replace("STATUT-DONATEUR", "Personne physique") # XXX
latexfile = latexfile.replace("ADRESSE-DONATEUR", "{}, {} {}".format(
address,
postal_code,
city))
latexfile = latexfile.replace("SOMME", "{},{}".format(
str(answer["credit"])[:-2],
str(answer["credit"])[-2:]))
latexfile = latexfile.replace("DATE-VERSEMENT", date)
latexfile = latexfile.replace("MODE-VERSEMENT", answer["reference"])
latexfile = latexfile.replace("FORME-DON", "Déclaration de don manuel")
latexfile = latexfile.replace("NATURE-DON", "Numéraire")
latexfile = latexfile.replace("FORME-DON", "Déclaration de don manuel") # XXX
latexfile = latexfile.replace("NATURE-DON", "Numéraire") # XXX
# Save the LaTeX file and generate the PDF
tex_file_path = os.path.join(WORKDIR, "validé", f"{filename}.tex")
set_file_content(tex_file_path, latexfile)
os.system(f"cd {WORKDIR}/validé/ && pdflatex {filename}.tex")
try:
os.remove(WORKDIR+"/validé/"+filename+".tex")
except:
pass
set_file_content(WORKDIR+"/validé/"+filename+".tex", latexfile)
os.system("cd {} && pdflatex {}".format(WORKDIR+"/validé/", filename+".tex"))
# Prepare and send email with attachment
mailheaders = get_file_content_all(RECEPT_MAIL_HEADERS).replace("COURRIEL-DON", email)
mailtext = get_file_content_all(RECEPT_MAIL).replace("DATE-DON", date)
mailtattach = get_file_content_all(RECEPT_MAIL_ATTACHMENT).replace("DATE-DON", f"{record}_{date.replace('/', '.')}")
pdf_path = os.path.join(WORKDIR, "validé", f"{filename}.pdf")
# Preparing mail
mailheaders = get_file_content_all(RECEPT_MAIL_HEADERS).replace(
"COURRIEL-DON",
email) + "\n"
mailtext = get_file_content_all(RECEPT_MAIL).replace("DATE-DON", date) + "\n"
mailtattach = get_file_content_all(RECEPT_MAIL_ATTACHMENT).replace(
"DATE-DON",
record + "_" + date.replace("/", ".")) + "\n"
sendmail_with_attachment(mailheaders, mailtext, pdf_path, f"{filename}.pdf")
# Opening PDF file as binary
attachment = open(WORKDIR+"/validé/"+filename+".pdf", "rb")
data = attachment.read()
attachment.close()
ending = "--------------3yxkFgv0AINs5nd0i6BJrWaV--"
sendmail_with_attachment(mailheaders, mailtext, mailtattach, data, ending)
# The end
os.remove(WORKDIR+"/transition/"+record)
# Clean up
os.remove(os.path.join(WORKDIR, "transition", record))
return True
# Process new donation records
def check_record(intent):
numero, content = get_file_content_all(intent).split("|")
name, surname, address, postal_code, city, email, amount, mode = content.split(";")[:8]
name, surname, address, postal_code, city, email, amount, mode = \
content.split(";")[:8]
BUF.append(f"* {numero} {name} {surname}, {amount}")
BUF.append("* {} {}, {}".format(numero, name+" "+surname, amount))
BUF.append(" NOUVEAU DON")
BUF.append("")
lines = [f"{name}\n{surname}\n{address}\n{postal_code}\n{city}\n{email}\n{amount}\n{mode}\n"]
set_file_content(os.path.join(WORKDIR, "transition", numero), lines)
os.remove(intent) # Clean up processed intent file
lines = [ "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n".format(
name, surname, address, postal_code, city, email, amount, mode
) ]
set_file_content(WORKDIR+"/transition/"+numero, lines)
if numero in os.listdir(WORKDIR+"/transition/"):
os.remove(intent)
# Main donation validation workflow
def validate_donors():
# Process new donation intents
for new_intent in os.listdir(os.path.join(WORKDIR, "nouveau")):
check_record(os.path.join(WORKDIR, "nouveau", new_intent))
# Validate existing records or notify unpaid
for record in os.listdir(os.path.join(WORKDIR, "transition")):
# Get new
for new_intent in os.listdir(WORKDIR+"/nouveau"):
check_record(WORKDIR+"/nouveau/"+new_intent)
# Validate record
for record in os.listdir(WORKDIR+"/transition"):
if not validate(record):
notify_unpaid(record)
# Main function
def main():
setup_workdir()
#GET DATA()
validate_donors()
# Send summary email
# End of work
# Launch summary mail
mailheaders = get_file_content_all(SUMMARY_MAIL) + "\n"
mailtext = ""
if BUF:
mailtext = "\n".join(BUF)
sendmail_with_attachment(mailheaders, mailtext, None, None)
is_sendable = False;
for line in BUF:
mailtext += line + "\n"
is_sendable = True;
print(line)
if is_sendable:
sendmail(mailheaders, mailtext)
## Bootstrap
if __name__ == '__main__':
main()