00001
00002 '''
00003 This module contains all imports, defines and basic classes for start_client.py.
00004
00005 Reading University
00006 MSc in Network Centered Computing
00007 a.weise - a.weise@reading.ac.uk - December 2005
00008 '''
00009
00010 import os, sys, signal, re, copy
00011 import string, time
00012
00013
00014 import sqlite
00015
00016
00017 import smtplib, socket
00018
00019
00020 from xml.sax import make_parser
00021 from xml.sax.handler import ContentHandler, feature_namespaces
00022 import xml.sax
00023
00024
00025 from M2Crypto.m2xmlrpclib import Server, SSL_Transport
00026 from M2Crypto import SSL
00027
00028
00029 import threading, thread
00030
00031
00032
00033 class MyContentHandler(ContentHandler):
00034 '''
00035 This class is derived from _xmlplus.sax.handler and provides individual functions for parsing the xml file.
00036 '''
00037
00038 def __init__ (self, db_object, ip, ignore_error, mail_obj, verbose):
00039 '''
00040 Constructor
00041 '''
00042 self._verbose = verbose
00043 self._my_mail_ignore_error = ignore_error
00044 self._mail_obj = mail_obj
00045 self._ip = ip
00046 self._db = db_object
00047 self._db_access = self._db.get_access_cursor()
00048 self._searchTerm = ""
00049 self._date = ""
00050 self._date_flag = 0
00051 self._time = ""
00052 self._time_flag = 0
00053 self._error_number = 0
00054 self._error_number_flag = 0
00055 self._error_string = ""
00056 self._error_string_flag = 0
00057 self._linenumber = 0
00058 self._linenumber_flag = 0
00059
00060 def set_ip(self, ip):
00061 '''
00062 The function sets the member variable _ip.
00063 '''
00064 self._ip = ip
00065
00066 def startElement(self, tag, attr):
00067 '''
00068 The function overwrites the startElement function.
00069 '''
00070 self._searchTerm = tag
00071
00072 def characters (self, tag_text):
00073 '''
00074 This function overwrites the character function to extract the tag content.
00075 '''
00076 if (self._searchTerm == "date"):
00077 self._date = tag_text
00078 self._date_flag = 1
00079 elif (self._searchTerm == "time"):
00080 self._time = tag_text
00081 self._time_flag = 1
00082 elif (self._searchTerm == "error_number"):
00083 self._error_number = tag_text
00084 self._error_number_flag = 1
00085 elif (self._searchTerm == "error_string"):
00086 self._error_string = tag_text
00087 self._error_string_flag = 1
00088 elif (self._searchTerm == "linenumber"):
00089 self._linenumber = tag_text
00090 self._linenumber_flag = 1
00091
00092 def endElement(self, tag):
00093 '''
00094 This function overwrites endElement function.
00095 '''
00096 if (self._searchTerm == "date"):
00097 pass
00098 elif (self._searchTerm == "time"):
00099 pass
00100 elif (self._searchTerm == "error_number"):
00101 pass
00102 elif (self._searchTerm == "error_string"):
00103 self._error_string = self._error_string.replace("\"", "")
00104 elif (self._searchTerm == "linenumber"):
00105 pass
00106 self._searchTerm = ""
00107
00108 if(self._date_flag == 1 and self._time_flag == 1 and self._error_number_flag == 1 and self._error_string_flag == 1 and self._linenumber_flag == 1):
00109
00110 success = self._insert()
00111 if success == -1:
00112
00113 print " raise exception"
00114 assert success == 0
00115
00116
00117 if (0 != len(self._mail_obj)):
00118 for i in range(len(self._mail_obj)):
00119 check = self._test_keywords(self._my_mail_ignore_error[i], len(self._my_mail_ignore_error[i])-1, self._error_string)
00120 if (0 == check):
00121
00122 cont = "\n--------------------------------------"\
00123 "\ndate:\t\t\t"+self._date+ \
00124 "\ntime:\t\t\t"+self._time+ \
00125 "\nerror message:\t\t"+self._error_string+ \
00126 "\nline number:\t\t"+self._linenumber
00127
00128 self._mail_obj[i][0].add(cont)
00129
00130 self._mail_obj[i][0].count()
00131
00132 temp = "%s (%s)" % (self._date, self._time)
00133 if self._mail_obj[i][0].get_first_date() == '':
00134 self._mail_obj[i][0].set_first_date(temp)
00135
00136 self._mail_obj[i][0].set_last_date(temp)
00137
00138
00139 self._reset()
00140
00141 def _test_keywords(self, keywordlist, amount_of_keywords, teststring):
00142 '''
00143 This is a recursive function, which tests if a list of keywords is part of a string (AND relation). If all keywords found 0 is returned, otherwise -1
00144
00145 keywordlist = list of all keywords
00146 amount_of_keywords = number of keywords in list
00147 teststring = string, which needs to be investigated
00148
00149 return -1 if line is not interesting
00150 return 0 if line is taken
00151 '''
00152 if (amount_of_keywords == 0):
00153
00154 if( 2 == len(keywordlist[amount_of_keywords])):
00155 if (-1 == teststring.rfind(keywordlist[amount_of_keywords][0])):
00156
00157 return 0
00158 else:
00159 if( -1 == keywordlist[amount_of_keywords][1].rfind("!")):
00160
00161 temp = keywordlist[amount_of_keywords][1].strip("!")
00162 if ( -1 == teststring.rfind(temp)):
00163
00164 return 0
00165 else:
00166 return -1
00167 else:
00168
00169 if ( -1 != teststring.rfind(keywordlist[amount_of_keywords][1])):
00170
00171 return 0
00172 else:
00173 return -1
00174 else:
00175 if (-1 == teststring.rfind(keywordlist[amount_of_keywords][0])):
00176
00177 return 0
00178 else:
00179 return -1
00180 else:
00181 if( 2 == len(keywordlist[amount_of_keywords])):
00182 if (-1 == teststring.rfind(keywordlist[amount_of_keywords][0])):
00183
00184 return self._test_keywords(keywordlist, amount_of_keywords-1, teststring)
00185 else:
00186 if( -1 == keywordlist[amount_of_keywords][1].rfind("!")):
00187
00188 temp = keywordlist[amount_of_keywords][1].strip("!")
00189 if ( -1 == teststring.rfind(temp)):
00190
00191 return self._test_keywords(keywordlist, amount_of_keywords-1, teststring)
00192 else:
00193 return -1
00194 else:
00195
00196 if ( -1 != teststring.rfind(keywordlist[amount_of_keywords][1])):
00197
00198 return self._test_keywords(keywordlist, amount_of_keywords-1, teststring)
00199 else:
00200 return -1
00201 else:
00202 if (-1 == teststring.rfind(keywordlist[amount_of_keywords][0])):
00203
00204 return self._test_keywords(keywordlist, amount_of_keywords-1, teststring)
00205 else:
00206 return -1
00207
00208 def _reset(self):
00209 '''
00210 This function resets member variables.
00211 '''
00212 self._date = ""
00213 self._date_flag = 0
00214 self._time = ""
00215 self._time_flag = 0
00216 self._error_number = 0
00217 self._error_number_flag = 0
00218 self._error_string = ""
00219 self._error_string_flag = 0
00220 self._linenumber = 0
00221 self._linenumber_flag = 0
00222
00223 def _insert(self):
00224 '''
00225 This function inserts the data from the xml file into the database.
00226 '''
00227
00228 if ('-' == self._error_number):
00229
00230 self._error_number = 999999
00231
00232 sql = ' SELECT * FROM error WHERE e_number = "%s" ' % self._error_number
00233 success, self._db_access = self._db.execute_sql(1200, self._db_access, sql)
00234 if success == -1:
00235 return -1
00236 data = self._db_access.fetchall()
00237 if ( 0 == len(data)):
00238
00239 sql = ' INSERT INTO error (e_number, e_name, e_description) VALUES ("%s", "not specified", "") ' % self._error_number
00240 data = self._db_access.execute(sql)
00241 sql = ' SELECT * FROM error WHERE e_number = "%s" ' % self._error_number
00242 success, self._db.execute_sql(1200, self._db_access, sql)
00243 if success == -1:
00244 return -1
00245 data = self._db_access.fetchall()
00246
00247 error_id = data[0]["e_id"]
00248
00249
00250 sql = 'SELECT * FROM messages WHERE error_e_id = "%s"'% error_id+ \
00251 ' AND m_date = "%s" '% self._date + \
00252 ' AND m_time = "%s" '% self._time + \
00253 ' AND m_error_string = "%s"' %( self._error_string)
00254
00255 success, self._db.execute_sql(1200, self._db_access, sql)
00256 if success == -1:
00257 return -1
00258 data = self._db_access.fetchall()
00259 if (0 == len(data)):
00260
00261
00262 sql = 'SELECT * FROM host WHERE h_ip_address = "%s";' % self._ip
00263 success, self._db.execute_sql(1200, self._db_access, sql)
00264 if success == -1:
00265 return -1
00266 data = self._db_access.fetchall()
00267 if (1 == len(data)):
00268 ip = data[0]["h_id"]
00269 else:
00270 ip = data[0]["h_id"]
00271
00272 sql = 'INSERT INTO messages (host_h_id, error_e_id, m_date, m_time, m_error_string, m_line_number) VALUES (%s, %s, "%s", "%s", "%s", %s);' % (ip, error_id, self._date, self._time, self._error_string, self._linenumber)
00273 success, self._db.execute_sql(1200, self._db_access, sql)
00274 if success == -1:
00275 return -1
00276 else:
00277 pass
00278
00279 return 0
00280
00281
00282
00283 class Mail:
00284 '''
00285 This class deals with the mail issues.
00286 '''
00287
00288 def __init__(self, mail_address, smtp_server, smtp_pass, smtp_from, user, verbose):
00289 '''
00290 Constructor
00291 '''
00292 self._verbose = verbose
00293 self._mail_address = mail_address
00294 self._smtp_server = smtp_server
00295 self._smtp_pass = smtp_pass
00296 self._smtp_from = smtp_from
00297 self._smtp_user = user
00298 self._mail_name = "temp_email_unknown.txt"
00299
00300 self._error_count = 0
00301 self._first_date = ''
00302 self._last_date = ''
00303 self._first_date_flag = 0
00304
00305
00306
00307 def create_content(self, name):
00308 '''
00309 This function creates a temorary file, where the mail content gets saved temporarly.
00310 '''
00311 try:
00312 file_fd = open(name, 'w')
00313 self._mail_name = name
00314 file_fd.close()
00315 return 0
00316 except IOError, e:
00317 if self._verbose == 1:
00318 print "%s -> Problem creating email content -> " % (time.ctime(), e)
00319 return -1
00320
00321 def add(self, content):
00322 '''
00323 This function adds to the mail content.
00324 '''
00325 try:
00326 file_fd = file(self._mail_name, 'r+')
00327 file_fd.seek(0, 2)
00328 file_fd.writelines(content)
00329 file_fd.close()
00330 return 0
00331 except IOError, e:
00332 if self._verbose == 1:
00333 print "%s -> Problem adding email content -> " % (time.ctime(), e)
00334 return -1
00335
00336 def count(self):
00337 '''
00338 This function counts all inserted error within the mail by incrementing the member variable self._error_count.
00339 '''
00340 self._error_count += 1
00341
00342 def set_first_date(self, value):
00343 '''
00344 This function modifies the memeber variable self._first_date.
00345 '''
00346 self._first_date = value
00347
00348 def get_first_date(self):
00349 '''
00350 This function returns the content of the memeber variable self._first_date.
00351 '''
00352 return self._first_date
00353
00354 def set_last_date(self, value):
00355 '''
00356 This function modifies the memeber variable self._last_date
00357 '''
00358 self._last_date = value
00359
00360 def send_mail(self, receiver, server):
00361 '''
00362 This function sends the mail away.
00363 '''
00364 if self._verbose == 1:
00365 print "%s -> Try to send Mail, to -> \"%s\" ..." % (time.ctime(), receiver)
00366
00367
00368 subject = 'SRB LOG FILE PARSER NOTIFICATION - %s' % time.ctime(time.time())
00369 content = 'Hello,\n\nthis is an automatic generated mail from SRB LOG FILE PARSER [ Client ] ! Your are registered for recieving this notification for the SRB Server @ %s where between %s and %s -> %s interesting errors occured. \n\n---------------- error messages start ----------------\n\n' % (server, self._first_date, self._last_date, self._error_count)
00370
00371 try:
00372 if self._error_count <= 5000:
00373 file_fd = open(self._mail_name, 'r')
00374 mail_error = file_fd.read()
00375 file_fd.close()
00376 else:
00377 mail_error = "!!!\n\nTo detailed error messages could not be supplied due to more than 5000 messages. Please check the database or the original SRB log file.\n\n!!!\n"
00378
00379 if mail_error != "":
00380 content += mail_error
00381 content += '\n\n---------------- error messages end ----------------\n\nPlease do not respond to this mail!\n\n\nSRB LOG FILE PARSER [ CLIENT ]\n--\n[ powered by linux]'
00382
00383 timus = time.strftime("%d %B %Y %H:%M:%S")
00384
00385 text = 'From: '+self._smtp_from+'\n'
00386 text += 'To: '+receiver+'\n'
00387 text += 'Date: '+timus+'\n'
00388 text += 'Subject: '+subject+'\n'
00389
00390 text = text + content
00391
00392
00393 server = smtplib.SMTP(self._smtp_server)
00394 server.login(self._smtp_user, self._smtp_pass)
00395
00396
00397 server.sendmail(self._smtp_from, receiver, text)
00398
00399 if self._verbose == 1:
00400 print "%s -> Mail sent to \"%s\" !" % (time.ctime(), receiver)
00401 server.quit()
00402 self._error_count = 0
00403 return 0
00404 else:
00405 if self._verbose == 1:
00406 print "%s -> Nothing to send to \"%s\" !" % (time.ctime(), receiver)
00407 self._error_count = 0
00408 return-1
00409 except smtplib.SMTPAuthenticationError, e:
00410 if self._verbose == 1:
00411 print "%s -> Proplem with SMTP server authentication -> \"%s\" !" % (time.ctime(), e)
00412 print "\n"
00413 self._error_count = 0
00414 return -1
00415 except socket.error, e:
00416 if self._verbose == 1:
00417 print "%s -> Problem with SMTP server -> \"%s\" !" % (time.ctime(), e)
00418 print "\n"
00419 self._error_count = 0
00420 return -1
00421 except:
00422 if self._verbose == 1:
00423 print "%s -> Problem with sending mail to \"%s\" !" % (time.ctime(), receiver)
00424 self._error_count = 0
00425 return -1
00426
00427 def delete_content(self):
00428 '''
00429 This function deletes the temorary file with the mail content.
00430 '''
00431 try:
00432 os.remove(self._mail_name)
00433 if self._verbose == 1:
00434 print "%s -> Deleted -> \"%s\"" % (time.ctime(), self._mail_name)
00435 return 0
00436 except OSError, e:
00437 if self._verbose == 1:
00438 print "%s -> Could not delete mail content file! -> \"%s\" -> %s" % (time.ctime(), self._mail_name, e)
00439 return -1
00440
00441
00442
00443 class MyDatabase:
00444 '''
00445 This class deals with all the database issues.
00446 '''
00447
00448 def __init__(self, error_description_file, error_desciption_path, databasename, database_path, serverlist, project, verbose):
00449 '''
00450 constructor
00451 '''
00452 self._verbose = verbose
00453 error = "%s/%s" % (error_desciption_path, error_description_file)
00454 self._db_access = None
00455 self._database_path = database_path
00456
00457 if(1 == os.path.exists(database_path)):
00458 if self._verbose == 1:
00459 print "%s -> Database exists" % (time.ctime())
00460 os.chdir(database_path)
00461 else:
00462
00463 if self._verbose == 1:
00464 print "%s -> Create database " % time.ctime()
00465 os.mkdir(database_path)
00466 os.chdir(database_path)
00467
00468 if(0 == os.path.exists(databasename)):
00469 try:
00470
00471 self._connect = sqlite.connect(databasename, autocommit = 1)
00472
00473
00474 self._db_access = self._connect.cursor()
00475
00476
00477
00478 sql = "CREATE TABLE error(e_id INTEGER NOT NULL PRIMARY KEY, e_number INT(10) NOT NULL, e_name CHAR(200) NOT NULL, e_description CHAR(400) NULL);"
00479 self._db_access.execute(sql)
00480
00481 sql = "CREATE TABLE host (h_id INTEGER NOT NULL PRIMARY KEY, h_ip_address CHAR(15) NOT NULL, h_hostname CHAR(30) NULL);"
00482 self._db_access.execute(sql)
00483
00484 sql = "CREATE TABLE host_project (hp_h_id INTEGER UNSIGNED NOT NULL, hp_p_id INTEGER UNSIGNED NOT NULL);"
00485 self._db_access.execute(sql)
00486
00487 sql = "CREATE TABLE messages (m_id INTEGER NOT NULL PRIMARY KEY, m_date DATE NOT NULL, m_time TIME NOT NULL, m_error_string TEXT NOT NULL, m_line_number INT(7) NOT NULL, host_h_id INT(10) NOT NULL, error_e_id INT(10) NOT NULL);"
00488 self._db_access.execute(sql)
00489
00490 sql = "CREATE TABLE project (p_id INTEGER NOT NULL PRIMARY KEY, p_name CHAR(100) NOT NULL);"
00491 self._db_access.execute(sql)
00492
00493
00494
00495 error_file_fd = open(error, 'r')
00496 content = error_file_fd.readline()
00497 x = 0
00498 if self._verbose == 1:
00499 print "%s -> Initialising database ...\n" % time.ctime()
00500 z = 0
00501 while(1):
00502 if(content == "\n" or content == "\t"):
00503 content = error_file_fd.readline
00504 else:
00505
00506 content = content.lstrip("{")
00507 content_list = content.split(",")
00508 left = content_list[0].strip()
00509 if (left == '0' or left == '1'):
00510 content = error_file_fd.readline()
00511 else:
00512 if self._verbose == 1:
00513 x += 1
00514
00515 if (0 == x%2):
00516 if z == 0:
00517 sys.stdout.write("-\r")
00518 sys.stdout.flush()
00519 z = 1
00520 elif z == 1:
00521 sys.stdout.write("\\\r")
00522 sys.stdout.flush()
00523 z = 2
00524 elif z == 2:
00525 sys.stdout.write("|\r")
00526 sys.stdout.flush()
00527 z = 3
00528 elif z == 3:
00529 sys.stdout.write("/\r")
00530 sys.stdout.flush()
00531 z = 4
00532 elif z == 4:
00533 sys.stdout.write("-\r")
00534 sys.stdout.flush()
00535 z = 5
00536 elif z == 5:
00537 sys.stdout.write("\\\r")
00538 sys.stdout.flush()
00539 z = 6
00540 elif z == 6:
00541 sys.stdout.write("|\r")
00542 sys.stdout.flush()
00543 z = 7
00544 elif z == 7:
00545 sys.stdout.write("/\r")
00546 sys.stdout.flush()
00547 z = 0
00548 sys.stdout.flush
00549 right = content_list[1].strip()
00550 sql = "INSERT INTO error (e_number, e_name) VALUES (%s, \"%s\");" % (left, right)
00551 self._db_access.execute(sql)
00552 content = error_file_fd.readline()
00553 if(content == ''):
00554 break
00555
00556 error_file_fd.close()
00557 sql = "INSERT INTO error (e_number, e_name, e_description) VALUES (%s, \"%s\", \"%s\");" % (999999, "unknown", "unknown error number")
00558 self._db_access.execute(sql)
00559
00560
00561 sql = 'INSERT INTO project (p_name) VALUES ("%s")' % project
00562 self._db_access.execute(sql)
00563
00564 for i in range(len(serverlist)):
00565
00566 sql = 'INSERT INTO host (h_ip_address) VALUES ("%s")' % serverlist[i][0]
00567 self._db_access.execute(sql)
00568
00569 sql = 'SELECT * FROM host WHERE h_ip_address = "%s"' % serverlist[i][0]
00570 data = self._db_access.execute(sql)
00571 data = self._db_access.fetchall()
00572 host_id = data[0][0]
00573
00574 sql = 'SELECT * FROM project WHERE p_name = "%s"' % project
00575 data = self._db_access.execute(sql)
00576 data = self._db_access.fetchall()
00577 project_id = data[0][0]
00578
00579 sql = 'INSERT INTO host_project (hp_h_id, hp_p_id) VALUES (%s, %s)' % (host_id, project_id)
00580 data = self._db_access.execute(sql)
00581
00582 if self._verbose == 1:
00583 print "\n%s -> Database new created !" % time.ctime()
00584 except:
00585 print "%s -> Problem creating database!" % time.ctime()
00586 os.rmdir(self._database_path)
00587 os._exit(-1)
00588 else:
00589 try:
00590
00591
00592 self._connect = sqlite.connect(databasename, autocommit=1)
00593
00594
00595 self._db_access = self._connect.cursor()
00596
00597
00598 sql = "SELECT * FROM messages"
00599 self._db_access.execute(sql)
00600 data = self._db_access.fetchall()
00601 if (0 == len(data)):
00602 print "%s -> No data in table \"messages\" !" % (time.ctime())
00603 else:
00604 print "%s -> Database holds %s error messages !" % (time.ctime(), len(data))
00605
00606
00607 sql = "SELECT * FROM error"
00608 self._db_access.execute(sql)
00609 data = self._db_access.fetchall()
00610 if (0 == len(data)):
00611 print "%s -> Database corruption detected: Missing data in table \"error\".\n\nIt's recommended to delete the database and initialise it again! It seems the original intialisation process was not completed.\n" % (time.ctime())
00612 else:
00613 print "%s -> Database holds %s defined error numbers !" % (time.ctime(), len(data))
00614
00615
00616 sql = "SELECT * FROM host_project"
00617 self._db_access.execute(sql)
00618 data = self._db_access.fetchall()
00619 if (0 == len(data)):
00620 print "%s -> Database corruption detected: Missing connection between table \"host\" and \"project\".\n\nIt's recommended to delete the database and initialise it again! It seems the original intialisation process was not completed.\n" % (time.ctime())
00621 else:
00622 print "%s -> Database holds %s defined connections between table \"project\" and \"host\" !" % (time.ctime(), len(data))
00623
00624
00625 sql = "SELECT * FROM project"
00626 self._db_access.execute(sql)
00627 data = self._db_access.fetchall()
00628 if (0 == len(data)):
00629 print "%s -> Database corruption detected: Missing project, insert new project \"%s\" into database!\n\nIt's recommended to delete the database and initialise it again! It seems the original intialisation process was not completed.\n" % (time.ctime(), project)
00630
00631 sql = 'INSERT INTO project (p_name) VALUES ("%s")' % project
00632 self._db_access.execute(sql)
00633 else:
00634 print "%s -> Database holds %s defined projects !" % (time.ctime(), len(data))
00635
00636
00637 sql = "SELECT * FROM host"
00638 self._db_access.execute(sql)
00639 data = self._db_access.fetchall()
00640 if (0 == len(data)):
00641 print "%s -> Database corruption detected: Missing data in table \"host\"\n\nIt's recommended to delete the database and initialise it again! It seems the original intialisation process was not completed.\n" % (time.ctime())
00642 else:
00643 print "%s -> Database holds %s defined hosts !" % (time.ctime(), len(data))
00644
00645
00646 for i in range(len(serverlist)):
00647
00648 sql = 'SELECT * FROM host WHERE h_ip_address = "%s"' % serverlist[i][0]
00649 self._db_access.execute(sql)
00650 data = self._db_access.fetchall()
00651 if (0 == len(data)):
00652 if self._verbose == 1:
00653 print "%s -> Insert new host \"%s\" into database !" % (time.ctime(), serverlist[i][0])
00654
00655 sql = 'INSERT INTO host (h_ip_address) VALUES ("%s")' % (serverlist[i][0])
00656 self._db_access.execute(sql)
00657
00658 sql = 'SELECT * FROM host WHERE h_ip_address = "%s"' % serverlist[i][0]
00659 data = self._db_access.execute(sql)
00660 data = self._db_access.fetchall()
00661 host_id = data[0]["h_id"]
00662
00663 sql = 'SELECT * FROM project WHERE p_name = "%s"' % project
00664 data = self._db_access.execute(sql)
00665 data = self._db_access.fetchall()
00666 project_id = data[0][0]
00667
00668 sql = 'INSERT INTO host_project (hp_h_id, hp_p_id) VALUES (%s, %s)' % (host_id, project_id)
00669 data = self._db_access.execute(sql)
00670 except sqlite.DatabaseError, e:
00671 print "%s -> %s" % (time.ctime(), e)
00672 os._exit(-1)
00673
00674 def execute_sql(self, wait, database_obj, sql):
00675 '''
00676 This function tries to get access to a database for "wait" seconds. Either the sql query gets executed or the if no acccess is possible the program exits.
00677 '''
00678 for i in range(0, wait):
00679 try:
00680 database_obj.execute(sql)
00681 return 0, database_obj
00682 except sqlite.OperationalError:
00683 if self._verbose == 1:
00684 if i%20 == 0:
00685 text = "%s -> database temporary locked - keep trying for another %d seconds ...." % (time.ctime(), wait-i)
00686 print text
00687 time.sleep(1)
00688 except:
00689 if self._verbose == 1:
00690 print "%s -> database query execution error" % (time.ctime())
00691
00692 return -1, database_obj
00693
00694 def get_access_cursor(self):
00695 '''
00696 This function returns the database access cursor.
00697 '''
00698 return self._db_access
00699
00700 def get_database_path(self):
00701 '''
00702 This function returns the database path
00703 '''
00704 return self._database_path
00705
00706
00707
00708 class ClientThread(threading.Thread):
00709 '''
00710 This class gets the information from the server and puts it into the database !
00711 '''
00712 def __init__(self, shared, db_object, address, port, cl_cert, cl_cert_path, ca_cert, ca_cert_path, verbose, mail_address, mail_ignore_error, smtp_server, smtp_pass, smtp_from, user, interval, filelist):
00713 '''
00714 Constructor
00715 '''
00716
00717 self._file_list = filelist
00718 self._interval = interval
00719 self._verbose = verbose
00720 self._share = shared
00721 self._address = address
00722 self._port = port
00723 self._db_access = db_object
00724 self._client_certificate = cl_cert
00725 self._client_certificate_path = cl_cert_path
00726 self._client_ca = ca_cert
00727 self._client_ca_path = ca_cert_path
00728 threading.Thread.__init__(self)
00729
00730 self._xml_file_parser = make_parser()
00731
00732 self._xml_file_parser.setFeature(feature_namespaces, 0)
00733 self._smtp_password = smtp_pass
00734 self._mail_obj = []
00735
00736 if self._smtp_password != None:
00737 for i in range(len(mail_address)):
00738 obj = Mail(mail_address[i], smtp_server, smtp_pass, smtp_from, user, verbose)
00739 self._mail_obj.append((obj, mail_address[i]))
00740
00741 for i in range(len(self._mail_obj)):
00742 name = self._address+"-"+self._mail_obj[i][1]
00743 self._mail_obj[i][0].create_content(name)
00744
00745
00746 self._my_handler = MyContentHandler(self._db_access, self._address, mail_ignore_error, self._mail_obj, self._verbose )
00747 self._xml_file_parser.setContentHandler(self._my_handler)
00748
00749 self._stop_thread = False
00750
00751 def run(self):
00752 '''
00753 This functions overwrites the standard run method.
00754 '''
00755 filenames = []
00756
00757 if ( (0 == len(self._file_list)) & (self._stop_thread == False)):
00758
00759 try:
00760 if self._verbose == 1:
00761 print "%s -> Client %d connecting to server %s" % (time.ctime(), thread.get_ident(), self._address)
00762 try:
00763
00764
00765 connect = self._connect_to_server(self._address, self._port)
00766
00767
00768 except:
00769 if self._verbose == 1:
00770 print "%s -> Could not connect to host \"%s\"" % (time.ctime(), self._address)
00771 if (self._smtp_password != None):
00772 for g in range(len(self._mail_obj)):
00773 self._mail_obj[g][0].delete_content()
00774 self._stop_thread = True
00775
00776 if (self._stop_thread == False):
00777
00778 try:
00779 if self._verbose == 1:
00780 print "%s -> Get file names !!!" % time.ctime()
00781 filenames = connect.get_file_list()
00782 if ((-3 == filenames) & (self._stop_thread == False)):
00783
00784 check = self._wait(connect)
00785 if check == 0:
00786 filenames = connect.get_file_list()
00787 if (-3 == xml_content):
00788
00789 self._stop_thread = True
00790 if ((-2 == filenames) & (self._stop_thread == False)):
00791 if self._verbose == 1:
00792 print "%s -> RPC calls disabled !" % time.ctime()
00793 self._stop_thread = True
00794 if ((filenames == 0) & (self._stop_thread == False)):
00795 filenames = []
00796 if self._verbose == 1:
00797 print "%s -> %s files to fetch " % (time.ctime(), len(filenames))
00798 except:
00799 if self._verbose == 1:
00800 print "%s -> Could not connect (check) to IP \"%s\"" % (time.ctime(), self._address)
00801 if (self._smtp_password != None):
00802 for g in range(len(self._mail_obj)):
00803 self._mail_obj[g][0].delete_content()
00804 self._stop_thread = True
00805
00806 if ( (0 < len(filenames)) & (self._stop_thread == False)):
00807
00808 for g in range(len(filenames)):
00809 xml_content = connect.get_my_xml_file(filenames[g])
00810 if ( -3 == xml_content ):
00811 if self._verbose == 1:
00812 print "%s -> Parsing in progress ..." % time.ctime()
00813 check = self._wait(connect)
00814 if check == 0:
00815 xml_content = connect.get_my_xml_file(filenames[g])
00816 if (-3 == xml_content):
00817
00818 self._stop_thread = True
00819 break
00820 if ( -2 == xml_content ):
00821 if self._verbose == 1:
00822 print "%s -> RPC calls disabled !" % time.ctime()
00823 self._stop_thread = True
00824 break
00825 if (xml_content == "no file"):
00826
00827 if self._verbose == 1:
00828 print "%s -> No file available !!!" % time.ctime()
00829 self._stop_thread = True
00830 break
00831
00832
00833 name = "%s_client_xml_file_%d.xml" % (self._address, g)
00834
00835 self._share.lock()
00836 try:
00837 c = g
00838 while(1):
00839 if(0 == os.path.exists(name)):
00840
00841 name = "%s_client_xml_file_%d.xml" % (self._address, c)
00842 file_fd = open(name, 'w')
00843 file_fd.write(xml_content)
00844 file_fd.close()
00845 self._file_list.append(name)
00846 break
00847 name = "%s_client_xml_file_%d.xml" % (self._address, c)
00848 c += 1
00849 finally:
00850
00851 self._share.release()
00852 except SSL.SSLError, e:
00853 if self._verbose == 1:
00854 print "%s -> Connection error (server \"%s\"): %s !" % (time.ctime(), self._address, e)
00855 self._stop_thread = True
00856 except:
00857 if self._verbose == 1:
00858 print "%s -> Error connecting to server -> \"%s\" !" % (time.ctime(), self._address)
00859 self._stop_thread = True
00860
00861 if ( (0 < len(self._file_list)) & (self._stop_thread == False)):
00862
00863 for g in range(len(self._file_list)):
00864 name = self._file_list[g]
00865 if self._address == None:
00866 dbpath = "%s/"% self._db_access.get_database_path()
00867 ad = re.sub(dbpath, "", self._file_list[g])
00868 print ad
00869 ad = re.sub('_client_xml_file_[0-9]+.xml', "", ad)
00870 self._my_handler.set_ip(ad)
00871 try:
00872 file_fd = open(name, 'r')
00873 except IOError, e:
00874 print e
00875 self._stop_thread = True
00876 break
00877
00878
00879 z = 0
00880 while(1):
00881 try:
00882 if ((0 == self._share.set_variable(self)) & (self._stop_thread == False)):
00883 try:
00884 self._xml_file_parser.parse(file_fd)
00885 except xml.sax.SAXParseException,e :
00886 if self._verbose == 1:
00887 print "%s -> sax parser error: %s" % ( time.ctime(), e)
00888
00889 self._share.reset_variable()
00890 if (None != self._smtp_password):
00891
00892 for a in range(len(self._mail_obj)):
00893 self._mail_obj[a][0].send_mail(self._mail_obj[a][1], self._address)
00894
00895 file_fd.close()
00896 os.remove(name)
00897 if (self._smtp_password != None):
00898 for g in range(len(self._mail_obj)):
00899 self._mail_obj[g][0].delete_content()
00900 break
00901 else:
00902 z += 1
00903 if z == 10:
00904 if self._verbose == 1:
00905 print "%s -> can not access database --> terminating" % time.ctime()
00906 break
00907
00908 except AssertionError:
00909 file_fd.close()
00910 if self._verbose == 1:
00911 print "%s -> can not access database -> terminating" % time.ctime()
00912 if (self._smtp_password != None):
00913 for g in range(len(self._mail_obj)):
00914 self._mail_obj[g][0].delete_content()
00915 break
00916
00917 except:
00918 file_fd.close()
00919 if self._verbose == 1:
00920 print "%s -> problem processing XML file -> terminating" % time.ctime()
00921 if (self._smtp_password != None):
00922 for g in range(len(self._mail_obj)):
00923 self._mail_obj[g][0].delete_content()
00924 break
00925
00926 else:
00927 if (self._smtp_password != None):
00928 for g in range(len(self._mail_obj)):
00929 self._mail_obj[g][0].delete_content()
00930 print "%s -> client_thread %s STIRBT nun !!!" % (time.ctime(),thread.get_ident())
00931
00932 def _wait(self, connect_object):
00933 '''
00934 This function waits if the server is busy parsing the log file (busy waiting).
00935 '''
00936 counter = 0
00937 max_sleeping_time = 60 * self._interval
00938 sleeped = 0
00939 while(1):
00940
00941 check = connect_object.rpc_check_availabitity()
00942 if check == 0:
00943 return 0
00944 counter += 1
00945 if (counter == 30):
00946 if self._verbose == 1:
00947 print "%s -> Server takes a long time to parse file -> thread terminating" % time.ctime()
00948 return -1
00949
00950 sleeped += 10
00951 if sleeped > max_sleeping_time:
00952 return -1
00953 time.sleep(10)
00954
00955 def _connect_to_server(self, server, port):
00956 '''
00957 This function establishes the connection to the server.
00958 '''
00959 serverus = server
00960 ctx = self.create_ctx()
00961
00962 urladdress = "https://%s:%d" % (serverus, port)
00963 server = Server(urladdress, SSL_Transport(ctx))
00964
00965 return server
00966
00967 def create_ctx(self):
00968 '''
00969 The function creates the necessary SSL context using certificates.
00970 '''
00971 ctx = SSL.Context(protocol='sslv3')
00972 ctx.load_cert(self._client_certificate_path+"/"+self._client_certificate)
00973 ctx.load_client_CA(self._client_ca_path+"/"+self._client_ca)
00974
00975
00976 ctx.set_session_id_ctx('server')
00977 return ctx
00978
00979
00980
00981 class WorkerThread(threading.Thread):
00982 '''
00983 This class is responsible for starting the ClientThreads within a certain interval.
00984 '''
00985
00986 def __init__(self, shared, db_object, interval, serverlist, cl_cert, cl_cert_path, ca_cert, ca_cert_path, verbose, mail_address, mail_ignore_error, smtp_server, smtp_pass, smtp_from, user):
00987 '''
00988 Constructor
00989 '''
00990 self._verbose = verbose
00991 self._share = shared
00992 self._db_access = db_object
00993 self._interval = interval
00994 self._serverlist = serverlist
00995 self._client_certificate = cl_cert
00996 self._client_certificate_path = cl_cert_path
00997 self._client_ca = ca_cert
00998 self._client_ca_path = ca_cert_path
00999 self._mail_address = mail_address
01000 self._mail_ignore_error = mail_ignore_error
01001 self._smtp_server = smtp_server
01002 self._smtp_pass = smtp_pass
01003 self._smtp_from = smtp_from
01004 self._smtp_user = user
01005 self._list = []
01006 threading.Thread.__init__(self)
01007
01008 def run(self):
01009 '''
01010 This function overwrites the standard run method.
01011 '''
01012
01013 temp_list = []
01014
01015 while(1):
01016
01017
01018 os.path.walk(self._db_access.get_database_path(), self._parse_directory, self._list)
01019 if (0 < len(self._list)):
01020 temp_list = copy.deepcopy(self._list)
01021 self._thread = ClientThread(self. _share, self._db_access, None, None, self._client_certificate, self._client_certificate_path, self._client_ca, self._client_ca_path, self._verbose, self._mail_address, self._mail_ignore_error, self._smtp_server, self._smtp_pass, self._smtp_from, self._smtp_user, self._interval, temp_list)
01022 self._thread.start()
01023 del self._list[:]
01024
01025
01026 for i in range(len(self._serverlist)):
01027 dummy_list = []
01028
01029 self._thread = ClientThread(self. _share, self._db_access, self._serverlist[i][0], self._serverlist[i][1], self._client_certificate, self._client_certificate_path, self._client_ca, self._client_ca_path, self._verbose, self._mail_address, self._mail_ignore_error, self._smtp_server, self._smtp_pass, self._smtp_from, self._smtp_user, self._interval, dummy_list)
01030 self._thread.start()
01031
01032 if self._verbose == 1:
01033 print "\n%s -> sleeping for %d minutes\n" % (time.ctime(), (self._interval))
01034 time.sleep(self._interval*60)
01035
01036
01037 def _parse_directory(self, arg, dirname, fnames):
01038 '''
01039 This function "walks" through a given directory and considers all srbLOG*.gz files. The name and last modified time are saved in a list (2 dimensional array). The function should be used with os.path.walk(path, function_name, arg)!
01040 '''
01041 d = os.getcwd()
01042
01043 try:
01044 os.chdir(dirname)
01045 except:
01046 print "could not find directory \"%s\"" % dirname
01047 return -1
01048
01049 for f in fnames:
01050
01051 if (not os.path.isfile(f)) or (None == re.search('client_xml_file_[0-9]+.xml', f)):
01052 continue
01053 else:
01054
01055 filus = dirname+"/"+f
01056 self._list.append(filus)
01057
01058 os.chdir(d)
01059
01060
01061
01062
01063 class Mutex:
01064 '''
01065 This class makes sure that only one client is writing into the database. This is necessary since sqlite is not trhead safe within a process! Futhermore is provide the possiblity to synchronise thread accessing critical sections.
01066 '''
01067
01068 _db_locked = threading.Lock()
01069
01070 _locked = threading.Lock()
01071
01072 def __init__(self):
01073 '''
01074 Constructor
01075 '''
01076 self.writing = 0
01077 self._the_thread = 0
01078
01079 def set_variable(self, threadus):
01080 '''
01081 set variable writing
01082 '''
01083 Mutex._db_locked.acquire()
01084
01085 if self.writing == 0:
01086
01087 self.writing = 1
01088 self._the_thread = threadus
01089 Mutex._db_locked.release()
01090 return 0
01091 else:
01092 if (1 != self._the_thread.isAlive()):
01093
01094 self.writing = 0
01095 Mutex._db_locked.release()
01096 return -1
01097
01098 def reset_variable(self):
01099 '''
01100 reset variable writing
01101 '''
01102 Mutex._db_locked.acquire()
01103 self.writing = 0
01104 self._the_thread = 0
01105 Mutex._db_locked.release()
01106
01107 def lock(self):
01108 '''
01109 This functions acquires the look.
01110 '''
01111 Mutex._locked.acquire()
01112
01113 def release(self):
01114 '''
01115 This function releases the lock.
01116 '''
01117 Mutex._locked.release()
01118