import logging import requests import json import functions.get_filters_from_text import functions.get_newest_count import var from functions.send_message import send_message from functions.roll_token import roll_token from beloved_logger import logger from functions.update_display_name import update_display_name from functions.get_reports import get_reports async def get_chat_messages(): await roll_token() cookies = { var.cookie_name: var.cookie_value, 'sketchers_united_session': var.session, 'XSRF-TOKEN': var.xsrf_token } logger.debug(f"Getting chat messages with cookies {cookies}") headers = { 'User-Agent': 'automod', 'Accept': 'application/json, */*', 'X-CSRF-TOKEN': var.csrf_token, 'X-XSRF-TOKEN': var.xsrf_token, } params = { 'n': '30', } get_page_correctly = False response = "" while not get_page_correctly: logging.info(f"Getting messages for {var.chat_number}") response = requests.get(f'https://{var.site}/chats/{var.chat_number}/messages', params=params, cookies=cookies, headers=headers) if response.status_code == 200: get_page_correctly = True else: logging.error(f"An issue occurred getting the messages " f"for {var.chat_number} with error {response.status_code}") return response.json() async def check_chat(): new_chat = await get_chat_messages() if len(var.old_chat) == 0: logging.info("No chat messages were loaded before, so setting the old chat value to the new one") var.old_chat = new_chat old_ids = [message["id"] for message in var.old_chat] newest_messages = [message for message in new_chat if message["id"] not in old_ids] newest_messages.sort(key=lambda message: message["id"]) newest = newest_messages[:15] for x in newest: if "@automod" in x['text'][:8]: logging.info(f"Somebody mentioned me with {x['text']}") if "@automod status" in x['text']: if var.image_scanning_is_running: if var.scan_new_users_only: image_scan_status = "Yes, but only for users who have signed up in the past 2 weeks" else: image_scan_status = "Yes" else: image_scan_status = "No" if var.text_scanning_is_running: text_scan_status = "Yes" else: text_scan_status = "No" await send_message("*🕰 Status*\n" f"đŸ–ŧī¸ī¸ *Is image scanning on?:* {image_scan_status}\n" f"đŸ’Ŧ *Is text scanning on?:* {text_scan_status}\n" f"đŸƒâ€â™€ī¸ *Running since:* {var.bot_started_at}\n" f"❓ Try typing '@automod help'") elif "@automod i love you" in x['text']: await send_message(f"💌 I love you too @{x['profile']['username']}...") elif "@automod ping" in x['text']: await send_message("â˜šī¸ I've been pinged") elif "@automod help" in x['text']: await send_message("""âŒ¨ī¸ Prefix commands with @automod e.g. @automod ping â„šī¸ *Informational* status i love you ping đŸ–ŧī¸ *Image scanning* imagescanning on imagescanning off imagescanning list 🛑 imagescanning enable _label_ 🛑 imagescanning disable _label_ đŸ’Ŧī¸ *Text scanning* textscanning on textscanning off textscanning list textscanning add _word_ textscanning remove _word_ ❗ *If I didn't reply to your message, I am likely subject to a rate limit.* 🙂 I still did something though, I just didn't tell you about it.""") # # TEXT SCANNING # elif "@automod textscanning on" in x['text']: if var.text_scanning_is_running: logging.info("Text scanning attempted to be turned on but was already running.") await send_message("đŸ’Ŧī¸ Text scanning is already running!") else: var.current_posts_at_start_time = await functions.get_newest_count.get_newest_count() var.text_scanning_is_running = True logging.info("Turned on textscanning") await send_message(f"đŸ’Ŧ Text scanning has been enabled by @{x['profile']['username']}") await update_display_name() elif "@automod textscanning off" in x['text']: if var.image_scanning_is_running is False: logging.info("Text scanning attempted to be turning off but was already off") await send_message("đŸ–ŧī¸ Text scanning is already off!") else: var.text_scanning_is_running = False logging.info("Turning off textscanning") await send_message(f"đŸ’Ŧ Text scanning has been disabled by @{x['profile']['username']}") await update_display_name() elif "@automod textscanning list" in x['text']: logging.info("Retrieving textscanning list") with open(var.filter_file, 'r') as file: lines = [line.strip() for line in file if line.strip()] lines_joined = ', '.join(lines) logging.debug(f"Currently filtered words retrieved as {lines_joined}") await send_message(f"*Current filtered words*\n" f"{lines_joined}") elif "@automod textscanning add" in x['text']: parts = x['text'].split() if len(parts) == 4: text_to_add = parts[3] logging.info(f"Adding new word {text_to_add} to list {var.filters_list}") if text_to_add.strip() in var.filters_list: logging.info(f"Word {text_to_add} was already in the list {var.filters_list}") await send_message(f"â€ŧī¸ '{text_to_add}' is already in the filtered words list!") else: with open(var.filter_file, 'a') as file: logging.info(f"Writing {text_to_add} to {var.filter_file}") file.write(text_to_add + '\n') await send_message(f"🎉 Added {text_to_add} to the filtered words list.") var.filters_list = await functions.get_filters_from_text.get_filters_from_text(var.filter_file) else: logging.info("Adding of a textscanning word was called without any word provided.") await send_message(f"â€ŧī¸ You have to provide a word to add to the filtered words list!" f"\ne.g. '@automod filters add _sketch_' will add " f"the word 'sketch' to the filtered words list.") elif "@automod textscanning remove" in x['text']: parts = x['text'].split() if len(parts) == 4: text_to_remove = parts[3] logging.info(f"Removing word {text_to_remove} from list {var.filters_list}") if text_to_remove.strip() not in var.filters_list: logging.info(f"{text_to_remove} was not a pre-existing filtered word in {var.filters_list}") await send_message(f"â‰ī¸ '{text_to_remove}' was not a pre-existing filtered word.") else: with open(var.filter_file, 'r') as file: lines = file.readlines() logging.info(f"Taking away {text_to_remove} from {var.filter_file}") modified_lines = [line.replace(text_to_remove, '') for line in lines] with open(var.filter_file, 'w') as file: logging.info(f"Writing the new file back to {var.filter_file}") file.writelines(modified_lines) var.filters_list = await functions.get_filters_from_text.get_filters_from_text(var.filter_file) await send_message(f"🎉 Removed '{text_to_remove}' from the filtered words list.") else: logging.info("Removal of a textscanning word was called without any word provided.") await send_message(f"â€ŧī¸ You have to provide a word to add to the filtered words list!" f"\ne.g. '@automod filters remove _sketch_' will remove " f"the word 'sketch' from the filtered words list.") # # IMAGE SCANNING # elif "@automod imagescanning on" in x['text']: if var.image_scanning_is_running: logging.info("Image scanning attempted to be turned on but was already running.") await send_message("đŸ–ŧī¸ Image scanning is already running!") else: var.current_posts_at_start_time = await functions.get_newest_count.get_newest_count() var.image_scanning_is_running = True logging.info("Image scanning was turned on") await send_message(f"đŸ–ŧī¸ Image scanning has been enabled by @{x['profile']['username']}") await update_display_name() elif "@automod imagescanning off" in x['text']: if var.image_scanning_is_running is False: logging.info("Image scanning attempted to be turning off but was already off") await send_message("đŸ–ŧī¸ Image scanning is already off!") else: var.image_scanning_is_running = False logging.info("Image scanning was turned off") await send_message(f"đŸ–ŧī¸ Image scanning has been disabled by @{x['profile']['username']}") await update_display_name() elif "@automod imagescanning list" in x['text']: with open(var.labels_file, 'r') as file: logging.info(f"Showing list of imagescanning labels from {var.labels_file}") data = json.load(file) top_level_true = [] top_level_false = [] lower_level_true = [] lower_level_false = [] for top_level_object, top_level_data in data.items(): if top_level_data.get("status") is True: top_level_true.append(top_level_object) elif top_level_data.get("status") is False: top_level_false.append(top_level_object) for lower_level_object, lower_level_status in top_level_data.items(): if lower_level_status is True and lower_level_object != "status": lower_level_true.append(f"{lower_level_object}") elif lower_level_status is False and lower_level_object != "status": lower_level_false.append(f"{lower_level_object}") logging.debug(f"*❌ Disabled:* \n" f" _Top level:_ {', '.join(top_level_false)}, \n" f" _Lower level:_ {', '.join(lower_level_false)}\n" f"*✅ Enabled:* \n" f" _Top level:_ {', '.join(top_level_true)}, \n" f" _Lower level:_ {', '.join(lower_level_true)}") await send_message(f"*❌ Disabled:* \n" f" _Top level:_ {', '.join(top_level_false)}, \n" f" _Lower level:_ {', '.join(lower_level_false)}\n" f"*✅ Enabled:* \n" f" _Top level:_ {', '.join(top_level_true)}, \n" f" _Lower level:_ {', '.join(lower_level_true)}") elif "@automod imagescanning enablefilter" in x['text']: await send_message("Not yet implemented") elif "@automod imagescanning disablefilter" in x['text']: await send_message("Not yet implemented") elif "@automod reports" in x['text']: reports = await get_reports() await send_message(reports) else: await send_message("Invalid command given") var.old_chat = new_chat