diff options
author | Alexander Sulfrian <alexander@sulfrian.net> | 2013-07-25 19:26:52 +0200 |
---|---|---|
committer | Alexander Sulfrian <alexander@sulfrian.net> | 2015-12-03 03:29:37 +0100 |
commit | f3b86ed2d5bce85b3b9c27eb949cd88cf7d526bc (patch) | |
tree | a187725f5ec7f1c67d2b9a471b4000ad40a16b04 /irc.py | |
parent | 4b905606692a2f147946505abe6ade1c241d1000 (diff) | |
download | bot-f3b86ed2d5bce85b3b9c27eb949cd88cf7d526bc.tar.gz bot-f3b86ed2d5bce85b3b9c27eb949cd88cf7d526bc.tar.bz2 bot-f3b86ed2d5bce85b3b9c27eb949cd88cf7d526bc.zip |
fix indentation and trailing whitespaces
Diffstat (limited to 'irc.py')
-rwxr-xr-x | irc.py | 388 |
1 files changed, 194 insertions, 194 deletions
@@ -10,197 +10,197 @@ http://inamidst.com/phenny/ import sys, re, time, traceback import socket, asyncore, asynchat -class Origin(object): - source = re.compile(r'([^!]*)!?([^@]*)@?(.*)') - - def __init__(self, bot, source, args): - match = Origin.source.match(source or '') - self.nick, self.user, self.host = match.groups() - - if len(args) > 1: - target = args[1] - else: target = None - - mappings = {bot.nick: self.nick, None: None} - self.sender = mappings.get(target, target) - -class Bot(asynchat.async_chat): - def __init__(self, nick, name, channels, password=None): - asynchat.async_chat.__init__(self) - self.set_terminator('\n') - self.buffer = '' - - self.nick = nick - self.user = nick - self.name = name - self.password = password - - self.verbose = True - self.channels = channels or [] - self.stack = [] - - import threading - self.sending = threading.RLock() - - def initiate_send(self): - self.sending.acquire() - asynchat.async_chat.initiate_send(self) - self.sending.release() - - # def push(self, *args, **kargs): - # asynchat.async_chat.push(self, *args, **kargs) - - def __write(self, args, text=None): - # print 'PUSH: %r %r %r' % (self, args, text) - try: - if text is not None: - # 510 because CR and LF count too, as nyuszika7h points out - self.push((' '.join(args) + ' :' + text)[:510] + '\r\n') - else: self.push(' '.join(args)[:510] + '\r\n') - except IndexError: - pass - - def write(self, args, text=None): - # This is a safe version of __write - def safe(input): - input = input.replace('\n', '') - input = input.replace('\r', '') - return input.encode('utf-8') - try: - args = [safe(arg) for arg in args] - if text is not None: - text = safe(text) - self.__write(args, text) - except Exception, e: pass - - def run(self, host, port=6667): - self.initiate_connect(host, port) - - def initiate_connect(self, host, port): - if self.verbose: - message = 'Connecting to %s:%s...' % (host, port) - print >> sys.stderr, message, - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect((host, port)) - try: asyncore.loop() - except KeyboardInterrupt: - sys.exit() - - def handle_connect(self): - if self.verbose: - print >> sys.stderr, 'connected!' - if self.password: - self.write(('PASS', self.password)) - self.write(('NICK', self.nick)) - self.write(('USER', self.user, '+iw', self.nick), self.name) - - def handle_close(self): - self.close() - print >> sys.stderr, 'Closed!' - - def collect_incoming_data(self, data): - self.buffer += data - - def found_terminator(self): - line = self.buffer - if line.endswith('\r'): - line = line[:-1] - self.buffer = '' - - # print 'GOT:', repr(line) - if line.startswith(':'): - source, line = line[1:].split(' ', 1) - else: source = None - - if ' :' in line: - argstr, text = line.split(' :', 1) - else: argstr, text = line, '' - args = argstr.split() - - origin = Origin(self, source, args) - self.dispatch(origin, tuple([text] + args)) - - if args[0] == 'PING': - self.write(('PONG', text)) - - def dispatch(self, origin, args): - pass - - def msg(self, recipient, text): - self.sending.acquire() - - # Cf. http://swhack.com/logs/2006-03-01#T19-43-25 - if isinstance(text, unicode): - try: text = text.encode('utf-8') - except UnicodeEncodeError, e: - text = e.__class__ + ': ' + str(e) - if isinstance(recipient, unicode): - try: recipient = recipient.encode('utf-8') - except UnicodeEncodeError, e: - return - - # No messages within the last 3 seconds? Go ahead! - # Otherwise, wait so it's been at least 0.8 seconds + penalty - if self.stack: - elapsed = time.time() - self.stack[-1][0] - if elapsed < 3: - penalty = float(max(0, len(text) - 50)) / 70 - wait = 0.8 + penalty - if elapsed < wait: - time.sleep(wait - elapsed) - - # Loop detection - messages = [m[1] for m in self.stack[-8:]] - if messages.count(text) >= 5: - text = '...' - if messages.count('...') >= 3: - self.sending.release() - return - - def safe(input): - input = input.replace('\n', '') - return input.replace('\r', '') - self.__write(('PRIVMSG', safe(recipient)), safe(text)) - self.stack.append((time.time(), text)) - self.stack = self.stack[-10:] - - self.sending.release() - - def notice(self, dest, text): - self.write(('NOTICE', dest), text) - - def error(self, origin): - try: - import traceback - trace = traceback.format_exc() - print trace - lines = list(reversed(trace.splitlines())) - - report = [lines[0].strip()] - for line in lines: - line = line.strip() - if line.startswith('File "/'): - report.append(line[0].lower() + line[1:]) - break - else: report.append('source unknown') - - self.msg(origin.sender, report[0] + ' (' + report[1] + ')') - except: self.msg(origin.sender, "Got an error.") - -class TestBot(Bot): - def f_ping(self, origin, match, args): - delay = m.group(1) - if delay is not None: - import time - time.sleep(int(delay)) - self.msg(origin.sender, 'pong (%s)' % delay) - else: self.msg(origin.sender, 'pong') - f_ping.rule = r'^\.ping(?:[ \t]+(\d+))?$' - -def main(): - # bot = TestBot('testbot', ['#d8uv.com']) - # bot.run('irc.freenode.net') - print __doc__ - -if __name__=="__main__": - main() +class Origin(object): + source = re.compile(r'([^!]*)!?([^@]*)@?(.*)') + + def __init__(self, bot, source, args): + match = Origin.source.match(source or '') + self.nick, self.user, self.host = match.groups() + + if len(args) > 1: + target = args[1] + else: target = None + + mappings = {bot.nick: self.nick, None: None} + self.sender = mappings.get(target, target) + +class Bot(asynchat.async_chat): + def __init__(self, nick, name, channels, password=None): + asynchat.async_chat.__init__(self) + self.set_terminator('\n') + self.buffer = '' + + self.nick = nick + self.user = nick + self.name = name + self.password = password + + self.verbose = True + self.channels = channels or [] + self.stack = [] + + import threading + self.sending = threading.RLock() + + def initiate_send(self): + self.sending.acquire() + asynchat.async_chat.initiate_send(self) + self.sending.release() + + # def push(self, *args, **kargs): + # asynchat.async_chat.push(self, *args, **kargs) + + def __write(self, args, text=None): + # print 'PUSH: %r %r %r' % (self, args, text) + try: + if text is not None: + # 510 because CR and LF count too, as nyuszika7h points out + self.push((' '.join(args) + ' :' + text)[:510] + '\r\n') + else: self.push(' '.join(args)[:510] + '\r\n') + except IndexError: + pass + + def write(self, args, text=None): + # This is a safe version of __write + def safe(input): + input = input.replace('\n', '') + input = input.replace('\r', '') + return input.encode('utf-8') + try: + args = [safe(arg) for arg in args] + if text is not None: + text = safe(text) + self.__write(args, text) + except Exception, e: pass + + def run(self, host, port=6667): + self.initiate_connect(host, port) + + def initiate_connect(self, host, port): + if self.verbose: + message = 'Connecting to %s:%s...' % (host, port) + print >> sys.stderr, message, + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect((host, port)) + try: asyncore.loop() + except KeyboardInterrupt: + sys.exit() + + def handle_connect(self): + if self.verbose: + print >> sys.stderr, 'connected!' + if self.password: + self.write(('PASS', self.password)) + self.write(('NICK', self.nick)) + self.write(('USER', self.user, '+iw', self.nick), self.name) + + def handle_close(self): + self.close() + print >> sys.stderr, 'Closed!' + + def collect_incoming_data(self, data): + self.buffer += data + + def found_terminator(self): + line = self.buffer + if line.endswith('\r'): + line = line[:-1] + self.buffer = '' + + # print 'GOT:', repr(line) + if line.startswith(':'): + source, line = line[1:].split(' ', 1) + else: source = None + + if ' :' in line: + argstr, text = line.split(' :', 1) + else: argstr, text = line, '' + args = argstr.split() + + origin = Origin(self, source, args) + self.dispatch(origin, tuple([text] + args)) + + if args[0] == 'PING': + self.write(('PONG', text)) + + def dispatch(self, origin, args): + pass + + def msg(self, recipient, text): + self.sending.acquire() + + # Cf. http://swhack.com/logs/2006-03-01#T19-43-25 + if isinstance(text, unicode): + try: text = text.encode('utf-8') + except UnicodeEncodeError, e: + text = e.__class__ + ': ' + str(e) + if isinstance(recipient, unicode): + try: recipient = recipient.encode('utf-8') + except UnicodeEncodeError, e: + return + + # No messages within the last 3 seconds? Go ahead! + # Otherwise, wait so it's been at least 0.8 seconds + penalty + if self.stack: + elapsed = time.time() - self.stack[-1][0] + if elapsed < 3: + penalty = float(max(0, len(text) - 50)) / 70 + wait = 0.8 + penalty + if elapsed < wait: + time.sleep(wait - elapsed) + + # Loop detection + messages = [m[1] for m in self.stack[-8:]] + if messages.count(text) >= 5: + text = '...' + if messages.count('...') >= 3: + self.sending.release() + return + + def safe(input): + input = input.replace('\n', '') + return input.replace('\r', '') + self.__write(('PRIVMSG', safe(recipient)), safe(text)) + self.stack.append((time.time(), text)) + self.stack = self.stack[-10:] + + self.sending.release() + + def notice(self, dest, text): + self.write(('NOTICE', dest), text) + + def error(self, origin): + try: + import traceback + trace = traceback.format_exc() + print trace + lines = list(reversed(trace.splitlines())) + + report = [lines[0].strip()] + for line in lines: + line = line.strip() + if line.startswith('File "/'): + report.append(line[0].lower() + line[1:]) + break + else: report.append('source unknown') + + self.msg(origin.sender, report[0] + ' (' + report[1] + ')') + except: self.msg(origin.sender, "Got an error.") + +class TestBot(Bot): + def f_ping(self, origin, match, args): + delay = m.group(1) + if delay is not None: + import time + time.sleep(int(delay)) + self.msg(origin.sender, 'pong (%s)' % delay) + else: self.msg(origin.sender, 'pong') + f_ping.rule = r'^\.ping(?:[ \t]+(\d+))?$' + +def main(): + # bot = TestBot('testbot', ['#d8uv.com']) + # bot.run('irc.freenode.net') + print __doc__ + +if __name__=="__main__": + main() |