from matrixbot import MatrixBot from functools import partial import random import mimetypes import json from os import listdir,environ from os.path import isfile, join import logging from datetime import datetime from googletrans import LANGUAGES,Translator logging.basicConfig(level=logging.DEBUG) log = logging.getLogger("Kalauerbot") version = "1.0.0" class Kalauerbot(MatrixBot): def __init__(self,host,display_name,token,user_id,dbpath = "kalauerdb.json",allowlist=None,oofdir="oof"): log.debug(f"dbpath: {dbpath}") log.debug(f"allowlist: {allowlist}") self.allowlist = allowlist self.dbpath=dbpath self.load_db() try: self.oofs = [ join(oofdir,f) for f in listdir(oofdir) if isfile(join(oofdir, f))] except: log.info(f"cannot load images from oofdir {oofdir}") self.oofs = [] super().__init__(host,display_name,token,user_id) def load_db(self): try: log.info(f"loading entries from {self.dbpath}") self.db = json.load(open(self.dbpath)) log.debug(self.db) except: log.info(f"Cannot open {self.dbpath}, creating empty db") self.db = { "__chosen__": None } self.save_db() def save_db(self): json.dump(self.db,open(self.dbpath,"w+")) log.debug(f"saving db to {self.dbpath}") def active_db(self): return dict(self._get_active()) def active_db(self): return dict(self._get_active()) def _get_db(self): for name,entry in self.db.items(): if name == "__chosen__": continue # skip the __chosen__ special object yield entry def _get_all_kalauer(self): for name,entry in self.db.items(): if name == "__chosen__": continue for record in entry.get('record',[]): yield name,record['date'],record['text'] def _get_active(self): for name,entry in self.db.items(): if name == "__chosen__": continue # skip the __chosen__ special object elif entry['active']: yield name,entry else: log.debug(f"skipping {name} because user is inactive") continue def process_event(self, room, event): sender = event['sender'] if self.user_id == sender: return # don't answer own message if event['type'] != 'm.room.message': return try: msg = event['content']['body'] if self.allowlist and room.room_id not in self.allowlist: log.debug("Ignoring message {msg} because {room.room_id} is not in allowlist") say = partial(self.say,room) db = self.db chosen = db['__chosen__'] if msg == "!help" or msg == "!commands": say(f""" Kalauerbot Version {version} available commands: !add - add a new person to the kalauer roster (alias: !activate, !reactivate) !delete - deactivate the person from the roster (alias: !remove, !deactivate, !freeze) !deactivate - deactivate person (e.g. because he is in holiday) !activate - re-activate person !kill - ultimately remove person from kalauer roster (cannot be reversed) !choose - choose the next person to be the kalauer könig !who - find out who was chosen to be todays kalauer könig (alias: !wer, !current) !ok - confirm the kalauer (alias: !gut !passt !reicht !approved) !reject - reject the kalauer (alias: !zu-gut !zu-schlecht !französisch !french) !demand - demand a certain person to be the new kalauer könig !list - list the current roster and weight,totals !record - record a kalauer for someone kalauer: - record your kalauer !kalauer - one of the old kalauers """) try: command,args= msg.split(" ",1) except: command,args = msg,"" if command in ["!add", "!activate", "!reactivate"]: name = args.strip() if name in db and db[name]['active']: say(f"{name} already part of the kalauer roster") elif name in db and not db[name]['active']: say(f"Reactivating {name}. Welcome back to the world of fun, {name}!") db[name]['active'] = True else: say(f"{name} has been added to the kalauer roster, starting weight is 1") db[name] = { "weight": 1, "total":0, "rejected":0, "active": True} self.save_db() elif command in ["!delete","!remove","!deactivate","!freeze"]: name = args.strip() if name not in db: say(f"Cannot deactiate someone who was never in kalauer db") elif name in db and db[name]['active']: say(f"Deactivating {name}, see you soon!") db[name]['active'] = False else: say(f"{name} was already deactivated in the kalauer db") self.save_db() elif command == "!choose": # TODO: refactor this, maybe? names = [] weights = [] for name,entity in self.active_db().items(): names.append(name) weights.append(entity['weight']) log.debug(names) log.debug(weights) chosen = random.choices(names,weights=weights)[0] chosen_weight = db[chosen]['weight'] all_weight = sum(weights) prob = round(chosen_weight / all_weight * 100,1) crosscheck = random.choices(names,weights=weights,k=100000) say(f"{chosen} was chosen with a probability of {prob}% to be the next kalauer könig, congratulations!\n" + f"cross-check: { crosscheck.count(chosen):,} out of 100,000 rolls were {chosen}") db["__chosen__"] = chosen self.save_db() elif command in ["!who","!wer", "!current"]: if chosen: say(f"{chosen} was chosen to be the next kalauer könig") else: say(f"Nobody was chosen to be the the kalauer könig, start the process with `!choose`") elif command in [ "!oof","!ooof","!oooof"]: if not db['__chosen__']: say("Nobody is nominated, cannot oof kalauer") else: log.info("Increasing weight of all active persons") for name in self.active_db(): db[name]['weight'] += 1 log.info("The kalauer könig gets his weight set to 0") db[chosen]['weight'] = 0 db[chosen]['total'] += 1 if self.oofs: oof = random.choice(self.oofs) log.info(f"uploading {oof}") mime = mimetypes.guess_type(oof)[0] oofurl = self.client.upload(open(oof,'rb').read(),mime) room.send_image(url=oofurl,name='oof') else: log.info("not uploading image because oofdir is empty") db[chosen]['rejected'] = db[chosen].get('rejected',0) + 1 say(f"""{chosen}, your kalauer just got oof'ed! Your total number of approved kalauers is {db[chosen]['total']} and rejected kalauers is {db[chosen]['rejected']}\n\nPlease choose a new potential kalauer könig by typing in `!choose` to let the kalauerbot choose or `!demand ` to force your choice\n\nDo not forget to record your kalauer with `kalauer: the funniest joke`. Someone else can also record the kalauer for you via `!record the funniest joke`""") log.debug("Unsetting current chosen") db["__chosen__"] = None self.save_db() elif command in [ "!ok", "!approved", "!gut", "!passt", "!reicht" ]: if not db['__chosen__']: say("Nobody is nominated, cannot approve kalauer") else: log.info("Increasing weight of all active persons") for name in self.active_db(): db[name]['weight'] += 1 log.info("The kalauer könig gets his weight set to 0") db[chosen]['weight'] = 0 db[chosen]['total'] += 1 say(f"""{chosen}, your kalauer has been approved! Your total number of approved kalauers is {db[chosen]['total']}\n\nPlease choose a new potential kalauer könig by typing in `!choose` to let the kalauerbot choose or `!demand ` to force your choice\n\nDo not forget to record your kalauer with `kalauer: the funniest joke`. Someone else can also record the kalauer for you via `!record the funniest joke`""") log.debug("Unsetting current chosen") db["__chosen__"] = None self.save_db() elif command in [ "!reject", "!zu-gut", "!zu-schlecht", "!französisch", "!french"]: if not db['__chosen__']: say("Nobody is nominated, cannot reject kalauer") else: db[db['__chosen__']]['rejected'] = db[db['__chosen__']].get('rejected',0) + 1 say(f"{chosen}, your kalauer privilege was revoked and your reject counter increased to {db[db['__chosen__']]['rejected']}, nobody is chosen now. Choose another kalauer könig with `!choose`") db["__chosen__"] = None self.save_db() elif command == "!demand": name = args.strip() if not name: say("When demanding a player, you must define who you actually want") elif name in self.active_db(): say(f"The previous kalauer könig demands user {name} to be the next kalauer könig. The rules of the Kalauer are divine and shall be respected.") db["__chosen__"] = name elif name in db: say(f"{name} is currently not active, cannot be chosen") else: say(f"the name '{name}' does not exist in the database, add the user before demanding a kalauer from him") self.save_db() elif command == "!list": all_weight = sum([db[a]["weight"] for a in db if a != "__chosen__" and db[a]["active"]] ) txt = "Kalauer roster:\n\n" for name,entry in db.items(): if name == '__chosen__': continue prob = round(entry['weight']/all_weight * 100,1) txt += f"* {name} - {entry['weight']} weight ({prob if entry['active'] else 0}%) - {entry['total']} approved and {entry.get('rejected',0)} rejected {'(active)' if entry['active'] else '(inactive)'}\n" say(txt) elif command == "!identify": name = args.strip() if name not in db: say(f"Name {name} is not in kalauerdb, cannot identify") else: say(f"Your matrix ID {sender} is now associated to the name {name}") db[name]['id'] = sender self.save_db() elif command == "!kill": name = args.strip() if name in db: say(f"Ultimately removing {name} from Kalauer König db (weight: {db[name]['weight']},total: {db[name]['total']}), we will miss you!") del db[name] else: say(f"Cannot delete someone who was never in the kalauer db") self.save_db() elif command == "!record": name,text = args.split(" ",1) if not name in db: say(f'cannot find the name {name} in the kalauer roster') else: entry = db[name] if entry.get('record',None) != list: entry['record'] = [] say(f'Thank you for recording the kalauer! It is now associated to the user {name}') entry['record'].append({ 'date': datetime.now().isoformat(), 'text': text}) self.save_db() elif command == "kalauer:": text = args.strip() for name,entry in self.db.items(): if name == "__chosen__": continue if entry.get('id','') == sender: say(f'Thank you for recording the kalauer! It is now associated to you ({name})') if entry.get('record',None) != list: entry['record'] = [] entry['record'].append({ 'date': datetime.now().isoformat(), 'text': text}) break else: say('cannot associate your username to a kalauer user, please run `!identify ` before') self.save_db() elif command == "!kalauer": # TODO: this feature is currently broken #kalauer = "What do you call a fat vegetarian? Compost bin" #translator = Translator() #lang = random.choice(list(LANGUAGES)) #ret = translator.translate(kalauer,src='de',dest=lang) #log.debug(ret) name,date,text = random.choice(list(self._get_all_kalauer())) date = datetime.fromisoformat(date) say(f"{text} - {name} ({date.strftime('%d.%m.%Y')})" ) # TODO #if ret.pronunciation: # say(f"Pronounced: {ret.pronunciation}") else: log.debug(f"Receive message {msg}") except Exception as e: say("Something went wrong when trying to process the command, so sorry!") log.debug(e) def main(): allowlist = environ.get("KALAUER_ALLOWLIST",None) if allowlist: allowlist = allowlist.split(',') bot = Kalauerbot( host = environ.get("MATRIX_SERVER","ext01.citadel.team"), display_name = "Kalauerbot", token = environ['MATRIX_TOKEN'], user_id = environ["MATRIX_ID"], dbpath = environ.get("KALAUER_DBPATH",None) or "kalauerdb.json", allowlist = allowlist, oofdir = environ.get("KALAUER_OOFDIR","oof") ) bot.start() if __name__ == "__main__": main()