280 lines
14 KiB
Python
Executable File
280 lines
14 KiB
Python
Executable File
import logging
|
||
import requests
|
||
import json
|
||
import functions.get_filters_from_text
|
||
import functions.get_newest_count
|
||
import var
|
||
from functions.send_message import send_message
|
||
from functions.roll_token import roll_token
|
||
from beloved_logger import logger
|
||
from functions.update_display_name import update_display_name
|
||
from functions.get_reports import get_reports
|
||
|
||
|
||
async def get_chat_messages():
|
||
await roll_token()
|
||
cookies = {
|
||
var.cookie_name: var.cookie_value,
|
||
'sketchers_united_session': var.session,
|
||
'XSRF-TOKEN': var.xsrf_token
|
||
}
|
||
|
||
logger.debug(f"Getting chat messages with cookies {cookies}")
|
||
|
||
headers = {
|
||
'User-Agent': 'automod',
|
||
'Accept': 'application/json, */*',
|
||
'X-CSRF-TOKEN': var.csrf_token,
|
||
'X-XSRF-TOKEN': var.xsrf_token,
|
||
}
|
||
|
||
params = {
|
||
'n': '30',
|
||
}
|
||
|
||
get_page_correctly = False
|
||
response = ""
|
||
|
||
while not get_page_correctly:
|
||
logging.info(f"Getting messages for {var.chat_number}")
|
||
response = requests.get(f'https://{var.site}/chats/{var.chat_number}/messages', params=params,
|
||
cookies=cookies,
|
||
headers=headers)
|
||
|
||
if response.status_code == 200:
|
||
get_page_correctly = True
|
||
else:
|
||
logging.error(f"An issue occurred getting the messages "
|
||
f"for {var.chat_number} with error {response.status_code}")
|
||
|
||
return response.json()
|
||
|
||
|
||
async def check_chat():
|
||
new_chat = await get_chat_messages()
|
||
if len(var.old_chat) == 0:
|
||
logging.info("No chat messages were loaded before, so setting the old chat value to the new one")
|
||
var.old_chat = new_chat
|
||
|
||
old_ids = [message["id"] for message in var.old_chat]
|
||
|
||
newest_messages = [message for message in new_chat if message["id"] not in old_ids]
|
||
newest_messages.sort(key=lambda message: message["id"])
|
||
newest = newest_messages[:15]
|
||
|
||
for x in newest:
|
||
if "@automod" in x['text'][:8]:
|
||
logging.info(f"Somebody mentioned me with {x['text']}")
|
||
if "@automod status" in x['text']:
|
||
if var.image_scanning_is_running:
|
||
if var.scan_new_users_only:
|
||
image_scan_status = "Yes, but only for users who have signed up in the past 2 weeks"
|
||
else:
|
||
image_scan_status = "Yes"
|
||
else:
|
||
image_scan_status = "No"
|
||
|
||
if var.text_scanning_is_running:
|
||
text_scan_status = "Yes"
|
||
else:
|
||
text_scan_status = "No"
|
||
|
||
await send_message("*🕰 Status*\n"
|
||
f"🖼️️ *Is image scanning on?:* {image_scan_status}\n"
|
||
f"💬 *Is text scanning on?:* {text_scan_status}\n"
|
||
f"🏃♀️ *Running since:* {var.bot_started_at}\n"
|
||
f"❓ Try typing '@automod help'")
|
||
|
||
elif "@automod i love you" in x['text']:
|
||
await send_message(f"💌 I love you too @{x['profile']['username']}...")
|
||
|
||
elif "@automod ping" in x['text']:
|
||
await send_message("☹️ I've been pinged")
|
||
|
||
elif "@automod help" in x['text']:
|
||
|
||
await send_message("""⌨️ Prefix commands with @automod e.g. @automod ping
|
||
ℹ️ *Informational*
|
||
status
|
||
i love you
|
||
ping
|
||
|
||
🖼️ *Image scanning*
|
||
imagescanning on
|
||
imagescanning off
|
||
imagescanning list
|
||
🛑 imagescanning enable _label_
|
||
🛑 imagescanning disable _label_
|
||
|
||
💬️ *Text scanning*
|
||
textscanning on
|
||
textscanning off
|
||
textscanning list
|
||
textscanning add _word_
|
||
textscanning remove _word_
|
||
|
||
❗ *If I didn't reply to your message, I am likely subject to a rate limit.*
|
||
🙂 I still did something though, I just didn't tell you about it.""")
|
||
|
||
#
|
||
# TEXT SCANNING
|
||
#
|
||
|
||
elif "@automod textscanning on" in x['text']:
|
||
if var.text_scanning_is_running:
|
||
logging.info("Text scanning attempted to be turned on but was already running.")
|
||
await send_message("💬️ Text scanning is already running!")
|
||
else:
|
||
var.current_posts_at_start_time = await functions.get_newest_count.get_newest_count()
|
||
var.text_scanning_is_running = True
|
||
logging.info("Turned on textscanning")
|
||
await send_message(f"💬 Text scanning has been enabled by @{x['profile']['username']}")
|
||
await update_display_name()
|
||
|
||
elif "@automod textscanning off" in x['text']:
|
||
if var.image_scanning_is_running is False:
|
||
logging.info("Text scanning attempted to be turning off but was already off")
|
||
await send_message("🖼️ Text scanning is already off!")
|
||
else:
|
||
var.text_scanning_is_running = False
|
||
logging.info("Turning off textscanning")
|
||
await send_message(f"💬 Text scanning has been disabled by @{x['profile']['username']}")
|
||
await update_display_name()
|
||
|
||
elif "@automod textscanning list" in x['text']:
|
||
logging.info("Retrieving textscanning list")
|
||
with open(var.filter_file, 'r') as file:
|
||
lines = [line.strip() for line in file if line.strip()]
|
||
|
||
lines_joined = ', '.join(lines)
|
||
logging.debug(f"Currently filtered words retrieved as {lines_joined}")
|
||
|
||
await send_message(f"*Current filtered words*\n"
|
||
f"{lines_joined}")
|
||
|
||
elif "@automod textscanning add" in x['text']:
|
||
parts = x['text'].split()
|
||
if len(parts) == 4:
|
||
text_to_add = parts[3]
|
||
logging.info(f"Adding new word {text_to_add} to list {var.filters_list}")
|
||
|
||
if text_to_add.strip() in var.filters_list:
|
||
logging.info(f"Word {text_to_add} was already in the list {var.filters_list}")
|
||
await send_message(f"‼️ '{text_to_add}' is already in the filtered words list!")
|
||
else:
|
||
with open(var.filter_file, 'a') as file:
|
||
logging.info(f"Writing {text_to_add} to {var.filter_file}")
|
||
file.write(text_to_add + '\n')
|
||
|
||
await send_message(f"🎉 Added {text_to_add} to the filtered words list.")
|
||
var.filters_list = await functions.get_filters_from_text.get_filters_from_text(var.filter_file)
|
||
else:
|
||
logging.info("Adding of a textscanning word was called without any word provided.")
|
||
await send_message(f"‼️ You have to provide a word to add to the filtered words list!"
|
||
f"\ne.g. '@automod filters add _sketch_' will add "
|
||
f"the word 'sketch' to the filtered words list.")
|
||
|
||
elif "@automod textscanning remove" in x['text']:
|
||
parts = x['text'].split()
|
||
if len(parts) == 4:
|
||
text_to_remove = parts[3]
|
||
logging.info(f"Removing word {text_to_remove} from list {var.filters_list}")
|
||
|
||
if text_to_remove.strip() not in var.filters_list:
|
||
logging.info(f"{text_to_remove} was not a pre-existing filtered word in {var.filters_list}")
|
||
await send_message(f"⁉️ '{text_to_remove}' was not a pre-existing filtered word.")
|
||
else:
|
||
with open(var.filter_file, 'r') as file:
|
||
lines = file.readlines()
|
||
|
||
logging.info(f"Taking away {text_to_remove} from {var.filter_file}")
|
||
modified_lines = [line.replace(text_to_remove, '') for line in lines]
|
||
|
||
with open(var.filter_file, 'w') as file:
|
||
logging.info(f"Writing the new file back to {var.filter_file}")
|
||
file.writelines(modified_lines)
|
||
|
||
var.filters_list = await functions.get_filters_from_text.get_filters_from_text(var.filter_file)
|
||
await send_message(f"🎉 Removed '{text_to_remove}' from the filtered words list.")
|
||
else:
|
||
logging.info("Removal of a textscanning word was called without any word provided.")
|
||
await send_message(f"‼️ You have to provide a word to add to the filtered words list!"
|
||
f"\ne.g. '@automod filters remove _sketch_' will remove "
|
||
f"the word 'sketch' from the filtered words list.")
|
||
|
||
#
|
||
# IMAGE SCANNING
|
||
#
|
||
|
||
elif "@automod imagescanning on" in x['text']:
|
||
if var.image_scanning_is_running:
|
||
logging.info("Image scanning attempted to be turned on but was already running.")
|
||
await send_message("🖼️ Image scanning is already running!")
|
||
else:
|
||
var.current_posts_at_start_time = await functions.get_newest_count.get_newest_count()
|
||
var.image_scanning_is_running = True
|
||
logging.info("Image scanning was turned on")
|
||
await send_message(f"🖼️ Image scanning has been enabled by @{x['profile']['username']}")
|
||
await update_display_name()
|
||
|
||
elif "@automod imagescanning off" in x['text']:
|
||
if var.image_scanning_is_running is False:
|
||
logging.info("Image scanning attempted to be turning off but was already off")
|
||
await send_message("🖼️ Image scanning is already off!")
|
||
else:
|
||
var.image_scanning_is_running = False
|
||
logging.info("Image scanning was turned off")
|
||
await send_message(f"🖼️ Image scanning has been disabled by @{x['profile']['username']}")
|
||
await update_display_name()
|
||
|
||
elif "@automod imagescanning list" in x['text']:
|
||
with open(var.labels_file, 'r') as file:
|
||
logging.info(f"Showing list of imagescanning labels from {var.labels_file}")
|
||
data = json.load(file)
|
||
|
||
top_level_true = []
|
||
top_level_false = []
|
||
lower_level_true = []
|
||
lower_level_false = []
|
||
|
||
for top_level_object, top_level_data in data.items():
|
||
if top_level_data.get("status") is True:
|
||
top_level_true.append(top_level_object)
|
||
elif top_level_data.get("status") is False:
|
||
top_level_false.append(top_level_object)
|
||
|
||
for lower_level_object, lower_level_status in top_level_data.items():
|
||
if lower_level_status is True and lower_level_object != "status":
|
||
lower_level_true.append(f"{lower_level_object}")
|
||
elif lower_level_status is False and lower_level_object != "status":
|
||
lower_level_false.append(f"{lower_level_object}")
|
||
|
||
logging.debug(f"*❌ Disabled:* \n"
|
||
f" _Top level:_ {', '.join(top_level_false)}, \n"
|
||
f" _Lower level:_ {', '.join(lower_level_false)}\n"
|
||
f"*✅ Enabled:* \n"
|
||
f" _Top level:_ {', '.join(top_level_true)}, \n"
|
||
f" _Lower level:_ {', '.join(lower_level_true)}")
|
||
|
||
await send_message(f"*❌ Disabled:* \n"
|
||
f" _Top level:_ {', '.join(top_level_false)}, \n"
|
||
f" _Lower level:_ {', '.join(lower_level_false)}\n"
|
||
f"*✅ Enabled:* \n"
|
||
f" _Top level:_ {', '.join(top_level_true)}, \n"
|
||
f" _Lower level:_ {', '.join(lower_level_true)}")
|
||
|
||
elif "@automod imagescanning enablefilter" in x['text']:
|
||
await send_message("Not yet implemented")
|
||
|
||
elif "@automod imagescanning disablefilter" in x['text']:
|
||
await send_message("Not yet implemented")
|
||
|
||
elif "@automod reports" in x['text']:
|
||
reports = await get_reports()
|
||
await send_message(reports)
|
||
|
||
else:
|
||
await send_message("Invalid command given")
|
||
|
||
var.old_chat = new_chat
|