#! /usr/bin/env python3 # -*- coding: utf-8 -*- # LinuxGSM query_gsquery.py function # Author: Daniel Gibbs # Contributors: http://linuxgsm.com/contrib # Website: https://linuxgsm.com # Description: Allows querying of various game servers. import argparse import socket import sys engine_types=('protocol-valve','protocol-quake3','protocol-quake3','protocol-gamespy1','protocol-unreal2','ut3','minecraft','minecraftbe','jc2mp','mumbleping','soldat','teeworlds') class gsquery: server_response_timeout = 5 default_buffer_length = 1024 sourcequery=('protocol-valve','avalanche3.0','barotrauma','madness','quakelive','realvirtuality','refractor','source','goldsrc','spark','starbound','unity3d','unreal4','wurm') idtech2query=('protocol-quake3','idtech2','quake','iw2.0') idtech3query=('protocol-quake3','iw3.0','ioquake3','qfusion') minecraftquery=('minecraft','lwjgl2') minecraftbequery=('minecraftbe',) jc2mpquery=('jc2mp',) mumblequery=('mumbleping',) soldatquery=('soldat',) twquery=('teeworlds',) unrealquery=('protocol-gamespy1','unreal') unreal2query=('protocol-unreal2','unreal2') unreal3query=('ut3','unreal3') def __init__(self, arguments): self.argument = arguments # if self.argument.engine in self.sourcequery: self.query_prompt_string = b'\xFF\xFF\xFF\xFFTSource Engine Query\0' elif self.argument.engine in self.idtech2query: self.query_prompt_string = b'\xff\xff\xff\xffstatus\x00' elif self.argument.engine in self.idtech3query: self.query_prompt_string = b'\xff\xff\xff\xffgetstatus' elif self.argument.engine in self.jc2mpquery: self.query_prompt_string = b'\xFE\xFD\x09\x10\x20\x30\x40' elif self.argument.engine in self.minecraftquery: self.query_prompt_string = b'\xFE\xFD\x09\x3d\x54\x1f\x93' elif self.argument.engine in self.minecraftbequery: self.query_prompt_string = b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\xfe\xfe\xfe\xfe\xfd\xfd\xfd\xfd\x12\x34\x56\x78\x00\x00\x00\x00\x00\x00\x00\x00' elif self.argument.engine in self.mumblequery: self.query_prompt_string = b'\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08' elif self.argument.engine in self.soldatquery: self.query_prompt_string = b'\x69\x00' elif self.argument.engine in self.twquery: self.query_prompt_string = b'\x04\x00\x00\xff\xff\xff\xff\x05' + bytearray(511) elif self.argument.engine in self.unrealquery: self.query_prompt_string = b'\x5C\x69\x6E\x66\x6F\x5C' elif self.argument.engine in self.unreal2query: self.query_prompt_string = b'\x79\x00\x00\x00\x00' elif self.argument.engine in self.unreal3query: self.query_prompt_string = b'\xFE\xFD\x09\x00\x00\x00\x00' self.connected = False self.response = None @staticmethod def fatal_error(error_message, error_code=1): sys.stderr.write('ERROR: ' + str(error_message) + '\n') sys.exit(error_code) @staticmethod def exit_success(success_message=''): sys.stdout.write('OK: ' + str(success_message) + '\n') sys.exit(0) def responding(self): # Connect. connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) connection.settimeout(self.server_response_timeout) try: self.connected = connection.connect((self.argument.address, int(self.argument.port))) except socket.timeout: self.fatal_error('Request timed out', 1) except Exception: self.fatal_error('Unable to connect', 1) # Send. connection.send(self.query_prompt_string) # Receive. try: self.response = connection.recv(self.default_buffer_length) except socket.error: self.fatal_error('Unable to receive', 2) connection.close() # Response. if self.response is None: self.fatal_error('No response', 3) if len(self.response) < 5: sys.exit('Short response.', 3) else: self.exit_success(str(self.response)) def parse_args(): parser = argparse.ArgumentParser( description='Allows querying of various game servers.', usage='usage: python3 %(prog)s [options]', add_help=False ) parser.add_argument( '-a', '--address', type=str, required=True, help='The IPv4 address of the server.' ) parser.add_argument( '-p', '--port', type=int, required=True, help='The IPv4 port of the server.' ) parser.add_argument( '-e', '--engine', metavar='ENGINE', choices=engine_types, help='Engine type: ' + ' '.join(engine_types) ) parser.add_argument( '-v', '--verbose', action='store_true', help='Display verbose output.' ) parser.add_argument( '-d', '--debug', action='store_true', help='Display debugging output.' ) parser.add_argument( '-V', '--version', action='version', version='%(prog)s 0.0.1', help='Display version and exit.' ) parser.add_argument( '-h', '--help', action='help', help='Display help and exit.' ) return parser.parse_args() def main(): arguments = parse_args() server = gsquery(arguments) server.responding() if __name__ == '__main__': main()