su-automod/functions/check_chat.py

275 lines
13 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import requests
import json
import functions.get_filters_from_text
import functions.get_newest_count
import var
from functions.send_message import send_message
from functions.roll_token import roll_token
from beloved_logger import logger
from functions.update_display_name import update_display_name
async def get_chat_messages():
await roll_token()
cookies = {
var.cookie_name: var.cookie_value,
'sketchers_united_session': var.session,
'XSRF-TOKEN': var.xsrf_token
}
logger.debug(f"Getting chat messages with cookies {cookies}")
headers = {
'User-Agent': 'automod',
'Accept': 'application/json, */*',
'X-CSRF-TOKEN': var.csrf_token,
'X-XSRF-TOKEN': var.xsrf_token,
}
params = {
'n': '30',
}
get_page_correctly = False
response = ""
while not get_page_correctly:
logging.info(f"Getting messages for {var.chat_number}")
response = requests.get(f'https://{var.site}/chats/{var.chat_number}/messages', params=params,
cookies=cookies,
headers=headers)
if response.status_code == 200:
get_page_correctly = True
else:
logging.error(f"An issue occurred getting the messages "
f"for {var.chat_number} with error {response.status_code}")
return response.json()
async def check_chat():
new_chat = await get_chat_messages()
if len(var.old_chat) == 0:
logging.info("No chat messages were loaded before, so setting the old chat value to the new one")
var.old_chat = new_chat
old_ids = [message["id"] for message in var.old_chat]
newest_messages = [message for message in new_chat if message["id"] not in old_ids]
newest_messages.sort(key=lambda message: message["id"])
newest = newest_messages[:15]
for x in newest:
if "@automod" in x['text'][:8]:
logging.info(f"Somebody mentioned me with {x['text']}")
if "@automod status" in x['text']:
if var.image_scanning_is_running:
if var.scan_new_users_only:
image_scan_status = "Yes, but only for users who have signed up in the past 2 weeks"
else:
image_scan_status = "Yes"
else:
image_scan_status = "No"
if var.text_scanning_is_running:
text_scan_status = "Yes"
else:
text_scan_status = "No"
await send_message("*🕰 Status*\n"
f"🖼️️ *Is image scanning on?:* {image_scan_status}\n"
f"💬 *Is text scanning on?:* {text_scan_status}\n"
f"🏃‍♀️ *Running since:* {var.bot_started_at}\n"
f"❓ Try typing '@automod help'")
elif "@automod i love you" in x['text']:
await send_message(f"💌 I love you too @{x['profile']['username']}...")
elif "@automod ping" in x['text']:
await send_message("☹️ I've been pinged")
elif "@automod help" in x['text']:
await send_message("""⌨️ Prefix commands with @automod e.g. @automod ping
*Informational*
status
i love you
ping
🖼️ *Image scanning*
imagescanning on
imagescanning off
imagescanning list
🛑 imagescanning enable _label_
🛑 imagescanning disable _label_
💬️ *Text scanning*
textscanning on
textscanning off
textscanning list
textscanning add _word_
textscanning remove _word_
❗ *If I didn't reply to your message, I am likely subject to a rate limit.*
🙂 I still did something though, I just didn't tell you about it.""")
#
# TEXT SCANNING
#
elif "@automod textscanning on" in x['text']:
if var.text_scanning_is_running:
logging.info("Text scanning attempted to be turned on but was already running.")
await send_message("💬️ Text scanning is already running!")
else:
var.current_posts_at_start_time = await functions.get_newest_count.get_newest_count()
var.text_scanning_is_running = True
logging.info("Turned on textscanning")
await send_message(f"💬 Text scanning has been enabled by @{x['profile']['username']}")
await update_display_name()
elif "@automod textscanning off" in x['text']:
if var.image_scanning_is_running is False:
logging.info("Text scanning attempted to be turning off but was already off")
await send_message("🖼️ Text scanning is already off!")
else:
var.text_scanning_is_running = False
logging.info("Turning off textscanning")
await send_message(f"💬 Text scanning has been disabled by @{x['profile']['username']}")
await update_display_name()
elif "@automod textscanning list" in x['text']:
logging.info("Retrieving textscanning list")
with open(var.filter_file, 'r') as file:
lines = [line.strip() for line in file if line.strip()]
lines_joined = ', '.join(lines)
logging.debug(f"Currently filtered words retrieved as {lines_joined}")
await send_message(f"*Current filtered words*\n"
f"{lines_joined}")
elif "@automod textscanning add" in x['text']:
parts = x['text'].split()
if len(parts) == 4:
text_to_add = parts[3]
logging.info(f"Adding new word {text_to_add} to list {var.filters_list}")
if text_to_add.strip() in var.filters_list:
logging.info(f"Word {text_to_add} was already in the list {var.filters_list}")
await send_message(f"‼️ '{text_to_add}' is already in the filtered words list!")
else:
with open(var.filter_file, 'a') as file:
logging.info(f"Writing {text_to_add} to {var.filter_file}")
file.write(text_to_add + '\n')
await send_message(f"🎉 Added {text_to_add} to the filtered words list.")
var.filters_list = await functions.get_filters_from_text.get_filters_from_text(var.filter_file)
else:
logging.info("Adding of a textscanning word was called without any word provided.")
await send_message(f"‼️ You have to provide a word to add to the filtered words list!"
f"\ne.g. '@automod filters add _sketch_' will add "
f"the word 'sketch' to the filtered words list.")
elif "@automod textscanning remove" in x['text']:
parts = x['text'].split()
if len(parts) == 4:
text_to_remove = parts[3]
logging.info(f"Removing word {text_to_remove} from list {var.filters_list}")
if text_to_remove.strip() not in var.filters_list:
logging.info(f"{text_to_remove} was not a pre-existing filtered word in {var.filters_list}")
await send_message(f"⁉️ '{text_to_remove}' was not a pre-existing filtered word.")
else:
with open(var.filter_file, 'r') as file:
lines = file.readlines()
logging.info(f"Taking away {text_to_remove} from {var.filter_file}")
modified_lines = [line.replace(text_to_remove, '') for line in lines]
with open(var.filter_file, 'w') as file:
logging.info(f"Writing the new file back to {var.filter_file}")
file.writelines(modified_lines)
var.filters_list = await functions.get_filters_from_text.get_filters_from_text(var.filter_file)
await send_message(f"🎉 Removed '{text_to_remove}' from the filtered words list.")
else:
logging.info("Removal of a textscanning word was called without any word provided.")
await send_message(f"‼️ You have to provide a word to add to the filtered words list!"
f"\ne.g. '@automod filters remove _sketch_' will remove "
f"the word 'sketch' from the filtered words list.")
#
# IMAGE SCANNING
#
elif "@automod imagescanning on" in x['text']:
if var.image_scanning_is_running:
logging.info("Image scanning attempted to be turned on but was already running.")
await send_message("🖼️ Image scanning is already running!")
else:
var.current_posts_at_start_time = await functions.get_newest_count.get_newest_count()
var.image_scanning_is_running = True
logging.info("Image scanning was turned on")
await send_message(f"🖼️ Image scanning has been enabled by @{x['profile']['username']}")
await update_display_name()
elif "@automod imagescanning off" in x['text']:
if var.image_scanning_is_running is False:
logging.info("Image scanning attempted to be turning off but was already off")
await send_message("🖼️ Image scanning is already off!")
else:
var.image_scanning_is_running = False
logging.info("Image scanning was turned off")
await send_message(f"🖼️ Image scanning has been disabled by @{x['profile']['username']}")
await update_display_name()
elif "@automod imagescanning list" in x['text']:
with open(var.labels_file, 'r') as file:
logging.info(f"Showing list of imagescanning labels from {var.labels_file}")
data = json.load(file)
top_level_true = []
top_level_false = []
lower_level_true = []
lower_level_false = []
for top_level_object, top_level_data in data.items():
if top_level_data.get("status") is True:
top_level_true.append(top_level_object)
elif top_level_data.get("status") is False:
top_level_false.append(top_level_object)
for lower_level_object, lower_level_status in top_level_data.items():
if lower_level_status is True and lower_level_object != "status":
lower_level_true.append(f"{lower_level_object}")
elif lower_level_status is False and lower_level_object != "status":
lower_level_false.append(f"{lower_level_object}")
logging.debug(f"*❌ Disabled:* \n"
f" _Top level:_ {', '.join(top_level_false)}, \n"
f" _Lower level:_ {', '.join(lower_level_false)}\n"
f"*✅ Enabled:* \n"
f" _Top level:_ {', '.join(top_level_true)}, \n"
f" _Lower level:_ {', '.join(lower_level_true)}")
await send_message(f"*❌ Disabled:* \n"
f" _Top level:_ {', '.join(top_level_false)}, \n"
f" _Lower level:_ {', '.join(lower_level_false)}\n"
f"*✅ Enabled:* \n"
f" _Top level:_ {', '.join(top_level_true)}, \n"
f" _Lower level:_ {', '.join(lower_level_true)}")
elif "@automod imagescanning enablefilter" in x['text']:
await send_message("Not yet implemented")
elif "@automod imagescanning disablefilter" in x['text']:
await send_message("Not yet implemented")
else:
await send_message("Invalid command given")
var.old_chat = new_chat