changeset: 10787:b694b75d2e33 user: Dejan Muhamedagic date: Mon Jul 18 12:35:57 2011 +0200 summary: High: Shell: set of commands to examine logs, reports, etc diff -r 9609937061d7 -r b694b75d2e33 Makefile.am --- a/Makefile.am Fri Jul 15 10:51:00 2011 +1000 +++ b/Makefile.am Mon Jul 18 12:35:57 2011 +0200 @@ -39,8 +39,10 @@ install-exec-local: $(INSTALL) -d $(DESTDIR)/$(LCRSODIR) $(INSTALL) -d -m 750 $(DESTDIR)/$(CRM_CONFIG_DIR) $(INSTALL) -d -m 750 $(DESTDIR)/$(CRM_STATE_DIR) + $(INSTALL) -d -m 750 $(DESTDIR)/$(CRM_CACHE_DIR) -chown $(CRM_DAEMON_USER):$(CRM_DAEMON_GROUP) $(DESTDIR)/$(CRM_CONFIG_DIR) -chown $(CRM_DAEMON_USER):$(CRM_DAEMON_GROUP) $(DESTDIR)/$(CRM_STATE_DIR) + -chown $(CRM_DAEMON_USER):$(CRM_DAEMON_GROUP) $(DESTDIR)/$(CRM_CACHE_DIR) if BUILD_CS_SUPPORT rm -f $(DESTDIR)$(LCRSODIR)/pacemaker.lcrso $(DESTDIR)$(LCRSODIR)/service_crm.so cp $(DESTDIR)$(libdir)/service_crm.so $(DESTDIR)$(LCRSODIR)/pacemaker.lcrso diff -r 9609937061d7 -r b694b75d2e33 configure.ac --- a/configure.ac Fri Jul 15 10:51:00 2011 +1000 +++ b/configure.ac Mon Jul 18 12:35:57 2011 +0200 @@ -460,6 +460,10 @@ CRM_STATE_DIR=${localstatedir}/run/crm AC_DEFINE_UNQUOTED(CRM_STATE_DIR,"$CRM_STATE_DIR", Where to keep state files and sockets) AC_SUBST(CRM_STATE_DIR) +CRM_CACHE_DIR=${localstatedir}/cache/crm +AC_DEFINE_UNQUOTED(CRM_CACHE_DIR,"$CRM_CACHE_DIR", Where crm shell keeps the cache) +AC_SUBST(CRM_CACHE_DIR) + PE_STATE_DIR="${localstatedir}/lib/pengine" AC_DEFINE_UNQUOTED(PE_STATE_DIR,"$PE_STATE_DIR", Where to keep PEngine outputs) AC_SUBST(PE_STATE_DIR) diff -r 9609937061d7 -r b694b75d2e33 doc/crm.8.txt --- a/doc/crm.8.txt Fri Jul 15 10:51:00 2011 +1000 +++ b/doc/crm.8.txt Mon Jul 18 12:35:57 2011 +0200 @@ -13,7 +13,7 @@ crm - Pacemaker command line interface f SYNOPSIS -------- -*crm* [-D output_type] [-f file] [-hFRDw] [--version] [args] +*crm* [-D output_type] [-f file] [-H hist_src] [-hFRDw] [--version] [args] DESCRIPTION @@ -67,6 +67,11 @@ OPTIONS Make `crm` wait for the transition to finish. Applicable only for commands such as "resource start." +*-H, --history*='DIR|FILE':: + The `history` commands can examine either live cluster + (default) or a report generated by `hb_report`. Use this + option to specify a directory or file containing the report. + *-h, --help*:: Print help page. @@ -2346,6 +2351,254 @@ Example: simulate ............... +[[cmdhelp_history,cluster history]] +=== `history` + +Examining Pacemaker's history is a particularly involved task. +The number of subsystems to be considered, the complexity of the +configuration, and the set of various information sources, most +of which are not exactly human readable, keep analyzing resource +or node problems accessible to only the most knowledgeable. Or, +depending on the point of view, to the most persistent. The +following set of commands has been devised in hope to make +cluster history more accessible. + +Of course, looking at _all_ history could be time consuming +regardless of how good tools at hand are. Therefore, one should +first say which period he or she wants to analyze. If not +otherwise specified, the last hour is considered. Logs and other +relevant information is collected using `hb_report`. Since this +process takes some time and we always need fresh logs, +information is refreshed in a much faster way using `pssh(1)`. If +`python-pssh` is not found on the system, examining live cluster +is still possible though not as comfortable. + +Apart from examining live cluster, events may be retrieved from a +report generated by `hb_report` (see also the `-H` option). In +that case we assume that the period stretching the whole report +needs to be investigated. Of course, it is still possible to +further reduce the time range. + +==== `info` + +The `info` command shows most important information about the +cluster. + +Usage: +............... + info +............... +Example: +............... + info +............... + +[[cmdhelp_history_latest,show latest news from the cluster]] +==== `latest` + +The `latest` command shows a bit of recent history, more +precisely whatever happened since the last cluster change (the +latest transition). + +Usage: +............... + latest +............... +Example: +............... + latest +............... + +[[cmdhelp_history_limit,limit timeframe to be examined]] +==== `limit` + +All history commands look at events within certain period. It +defaults to the last hour for the live cluster source. There is +no limit for the `hb_report` source. Use this command to set the +timeframe. + +The time period is parsed by the dateutil python module. It +covers wide range of date formats. For instance: + +- 3:00 (today at 3am) +- 15:00 (today at 3pm) +- 2010/9/1 2pm (September 1st 2010 at 2pm) + +We won't bother to give definition of the time specification in +usage below. Either use common sense or read the +http://labix.org/python-dateutil[dateutil] documentation. + +If dateutil is not available, then the time is parsed using +strptime and only the kind as printed by `date(1)` is allowed: + +- Tue Sep 15 20:46:27 CEST 2010 + +Usage: +............... + limit [] +............... +Examples: +............... + limit 10:15 + limit 15h22m 16h + limit "Sun 5 20:46" "Sun 5 22:00" +............... + +[[cmdhelp_history_source,set source to be examined]] +==== `source` + +Events to be examined can come from the current cluster or from a +`hb_report` report. This command sets the source. `source live` +sets source to the running cluster and system logs. If no source +is specified, the current source information is printed. + +In case a report source is specified as a file reference, the file +is going to be unpacked in place where it resides. This directory +is not removed on exit. + +Usage: +............... + source [||live] +............... +Examples: +............... + source live + source /tmp/customer_case_22.tar.bz2 + source /tmp/customer_case_22 + source +............... + +[[cmdhelp_history_refresh,refresh live report]] +==== `refresh` + +This command makes sense only for the `live` source and makes +`crm` collect the latest logs and other relevant information from +the logs. If you want to make a completely new report, specify +`force`. + +Usage: +............... + refresh [force] +............... + +[[cmdhelp_history_detail,set the level of detail shown]] +==== `detail` + +How much detail to show from the logs. + +Usage: +............... + detail + + detail_level :: small integer (defaults to 0) +............... +Example: +............... + detail 1 +............... + +[[cmdhelp_history_setnodes,set the list of cluster nodes]] +==== `setnodes` + +In case the host this program runs on is not part of the cluster, +it is necessary to set the list of nodes. + +Usage: +............... + setnodes node [ ...] +............... +Example: +............... + setnodes node_a node_b +............... + +[[cmdhelp_history_resource,resource failed actions]] +==== `resource` + +Show status changes and any failures that happened on a resource. + +Usage: +............... + resource [ ...] +............... +Example: +............... + resource mydb +............... + +[[cmdhelp_history_node,node events]] +==== `node` + +Show important events that happened on a node. Important events +are node lost and join, standby and online, and fence. + +Usage: +............... + node [ ...] +............... +Example: +............... + node node1 +............... + +[[cmdhelp_history_log,log content]] +==== `log` + +Show logs for a node or combined logs of all nodes. + +Usage: +............... + log [] +............... +Example: +............... + log node-a +............... + +[[cmdhelp_history_peinputs,list or get PE input files]] +==== `peinputs` + +Every event in the cluster results in generating one or more +Policy Engine (PE) files. These files describe future motions of +resources. The files are listed along with the node where they +were created (the DC at the time). The `get` subcommand will copy +all PE input files to the current working directory (and use ssh +if necessary). + +The `show` subcommand will print actions planned by the PE and +run graphviz (`dotty`) to display a graphical representation. Of +course, for the latter an X11 session is required. This command +invokes `ptest(8)` in background. + +The `showdot` subcommand runs graphviz (`dotty`) to display a +graphical representation of the `.dot` file which has been +included in the report. Essentially, it shows the calculation +produced by `pengine` which is installed on the node where the +report was produced. + +If the PE input file number is not provided, it defaults to the +last one, i.e. the last transition. If the number is negative, +then the corresponding transition relative to the last one is +chosen. + +Usage: +............... + peinputs list [{|} ...] + peinputs get [{|} ...] + peinputs show [] [nograph] [v...] [scores] [actions] [utilization] + peinputs showdot [] + + range :: : +............... +Example: +............... + peinputs get 440:444 446 + peinputs show + peinputs show 444 + peinputs show -1 + peinputs showdot 444 +............... + === `end` (`cd`, `up`) The `end` command ends the current level and the user moves to diff -r 9609937061d7 -r b694b75d2e33 shell/modules/Makefile.am --- a/shell/modules/Makefile.am Fri Jul 15 10:51:00 2011 +1000 +++ b/shell/modules/Makefile.am Mon Jul 18 12:35:57 2011 +0200 @@ -33,6 +33,8 @@ modules = __init__.py \ msg.py \ parse.py \ ra.py \ + report.py \ + log_patterns.py \ singletonmixin.py \ template.py \ term.py \ diff -r 9609937061d7 -r b694b75d2e33 shell/modules/cibconfig.py --- a/shell/modules/cibconfig.py Fri Jul 15 10:51:00 2011 +1000 +++ b/shell/modules/cibconfig.py Mon Jul 18 12:35:57 2011 +0200 @@ -20,6 +20,7 @@ import subprocess import copy import xml.dom.minidom import re +import time from singletonmixin import Singleton from userprefs import Options, UserPrefs @@ -404,7 +405,6 @@ class CibObjectSetRaw(CibObjectSet): ''' Edit or display one or more CIB objects (XML). ''' - actions_filter = "grep LogActions: | grep -vw Leave" def __init__(self, *args): CibObjectSet.__init__(self, *args) self.obj_list = cib_factory.mkobj_list("xml",*args) @@ -470,19 +470,6 @@ class CibObjectSetRaw(CibObjectSet): def ptest(self, nograph, scores, utilization, actions, verbosity): if not cib_factory.is_cib_sane(): return False - if verbosity: - if actions: - verbosity = 'v' * max(3,len(verbosity)) - ptest = "ptest -X -%s" % verbosity.upper() - if scores: - ptest = "%s -s" % ptest - if utilization: - ptest = "%s -U" % ptest - if user_prefs.dotty and not nograph: - fd,dotfile = mkstemp() - ptest = "%s -D %s" % (ptest,dotfile) - else: - dotfile = None doc = cib_factory.objlist2doc(self.obj_list) cib = doc.childNodes[0] status = cib_status.get_status() @@ -490,21 +477,9 @@ class CibObjectSetRaw(CibObjectSet): common_err("no status section found") return False cib.appendChild(doc.importNode(status,1)) - # ptest prints to stderr - if actions: - ptest = "%s 2>&1 | %s | %s" % \ - (ptest, self.actions_filter, user_prefs.pager) - else: - ptest = "%s 2>&1 | %s" % (ptest, user_prefs.pager) - pipe_string(ptest,doc.toprettyxml()) + graph_s = doc.toprettyxml() doc.unlink() - if dotfile: - show_dot_graph(dotfile) - vars.tmpfiles.append(dotfile) - else: - if not nograph: - common_info("install graphviz to see a transition graph") - return True + return run_ptest(graph_s, nograph, scores, utilization, actions, verbosity) # # XML generate utilities @@ -1426,6 +1401,7 @@ class CibFactory(Singleton): def __init__(self): self.init_vars() self.regtest = options.regression_tests + self.last_commit_time = 0 self.all_committed = True # has commit produced error self._no_constraint_rm_msg = False # internal (just not to produce silly messages) self.supported_cib_re = "^pacemaker-1[.][012]$" @@ -1598,6 +1574,8 @@ class CibFactory(Singleton): print "Remove queue:" for obj in self.remove_queue: obj.dump_state() + def last_commit_at(self): + return self.last_commit_time def commit(self,force = False): 'Commit the configuration to the CIB.' if not self.doc: @@ -1608,6 +1586,8 @@ class CibFactory(Singleton): cnt = self.commit_doc(force) if cnt: # reload the cib! + if is_live_cib(): + self.last_commit_time = time.time() self.reset() self.initialize() return self.all_committed diff -r 9609937061d7 -r b694b75d2e33 shell/modules/completion.py --- a/shell/modules/completion.py Fri Jul 15 10:51:00 2011 +1000 +++ b/shell/modules/completion.py Mon Jul 18 12:35:57 2011 +0200 @@ -22,6 +22,7 @@ import readline from cibconfig import CibFactory from cibstatus import CibStatus +from report import Report from levels import Levels from ra import * from vars import Vars @@ -156,6 +157,22 @@ def ra_classes_list(idx,delimiter = Fals if delimiter: return ':' return ra_classes() +def report_rsc_list(idx,delimiter = False): + if delimiter: + return ' ' + return crm_report.rsc_list() +def report_node_list(idx,delimiter = False): + if delimiter: + return ' ' + return crm_report.node_list() +def report_pe_cmd_list(idx,delimiter = False): + if delimiter: + return ' ' + return ["list","get","show","showdot"] +def report_pe_list(idx,delimiter = False): + if delimiter: + return ' ' + return crm_report.peinputs_list() # # completion for primitives including help for parameters @@ -463,6 +480,12 @@ completer_lists = { "_regtest" : None, "_objects" : None, }, + "history" : { + "resource" : (report_rsc_list,loop), + "node" : (report_node_list,loop), + "log" : (report_node_list,loop), + "peinputs" : (report_pe_cmd_list,report_pe_list,loop), + }, } def get_completer_list(level,cmd): 'Return a list of completer functions.' @@ -474,5 +497,6 @@ user_prefs = UserPrefs.getInstance() vars = Vars.getInstance() cib_status = CibStatus.getInstance() cib_factory = CibFactory.getInstance() +crm_report = Report.getInstance() # vim:ts=4:sw=4:et: diff -r 9609937061d7 -r b694b75d2e33 shell/modules/crm_pssh.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/shell/modules/crm_pssh.py Mon Jul 18 12:35:57 2011 +0200 @@ -0,0 +1,160 @@ +# Modified pssh +# Copyright (c) 2011, Dejan Muhamedagic +# Copyright (c) 2009, Andrew McNabb +# Copyright (c) 2003-2008, Brent N. Chun + +"""Parallel ssh to the set of nodes in hosts.txt. + +For each node, this essentially does an "ssh host -l user prog [arg0] [arg1] +...". The -o option can be used to store stdout from each remote node in a +directory. Each output file in that directory will be named by the +corresponding remote node's hostname or IP address. +""" + +import fcntl +import os +import sys +import glob +import re + +parent, bindir = os.path.split(os.path.dirname(os.path.abspath(sys.argv[0]))) +if os.path.exists(os.path.join(parent, 'psshlib')): + sys.path.insert(0, parent) + +from psshlib import psshutil +from psshlib.manager import Manager, FatalError +from psshlib.task import Task +from psshlib.cli import common_parser, common_defaults + +from msg import * + +_DEFAULT_TIMEOUT = 60 +_EC_LOGROT = 120 + +def option_parser(): + parser = common_parser() + parser.usage = "%prog [OPTIONS] command [...]" + parser.epilog = "Example: pssh -h hosts.txt -l irb2 -o /tmp/foo uptime" + + parser.add_option('-i', '--inline', dest='inline', action='store_true', + help='inline aggregated output for each server') + parser.add_option('-I', '--send-input', dest='send_input', + action='store_true', + help='read from standard input and send as input to ssh') + parser.add_option('-P', '--print', dest='print_out', action='store_true', + help='print output as we get it') + + return parser + +def parse_args(myargs): + parser = option_parser() + defaults = common_defaults(timeout=_DEFAULT_TIMEOUT) + parser.set_defaults(**defaults) + opts, args = parser.parse_args(myargs) + return opts, args + +def show_errors(errdir, hosts): + for host in hosts: + fl = glob.glob("%s/*%s*" % (errdir,host)) + if not fl: + continue + for fname in fl: + try: + if os.stat(fname).st_size == 0: + continue + f = open(fname) + except: + continue + print "%s stderr:" % host + print ''.join(f) + f.close() + +def do_pssh(l, opts): + if opts.outdir and not os.path.exists(opts.outdir): + os.makedirs(opts.outdir) + if opts.errdir and not os.path.exists(opts.errdir): + os.makedirs(opts.errdir) + if opts.send_input: + stdin = sys.stdin.read() + else: + stdin = None + manager = Manager(opts) + user = "" + port = "" + hosts = [] + for host, cmdline in l: + cmd = ['ssh', host, '-o', 'PasswordAuthentication=no', + '-o', 'SendEnv=PSSH_NODENUM'] + if opts.options: + for opt in opts.options: + cmd += ['-o', opt] + if user: + cmd += ['-l', user] + if port: + cmd += ['-p', port] + if opts.extra: + cmd.extend(opts.extra) + if cmdline: + cmd.append(cmdline) + hosts.append(host) + t = Task(host, port, user, cmd, opts, stdin) + manager.add_task(t) + try: + statuses = manager.run() + except FatalError: + common_err("pssh to nodes failed") + show_errors(opts.errdir, hosts) + return False + + if min(statuses) < 0: + # At least one process was killed. + common_err("ssh process was killed") + show_errors(opts.errdir, hosts) + return False + # The any builtin was introduced in Python 2.5 (so we can't use it yet): + #elif any(x==255 for x in statuses): + for status in statuses: + if status == 255: + common_warn("ssh processes failed") + show_errors(opts.errdir, hosts) + return False + for status in statuses: + if status not in (0, _EC_LOGROT): + common_warn("some ssh processes failed") + show_errors(opts.errdir, hosts) + return False + return True + +def next_loglines(a, outdir, errdir): + ''' + pssh to nodes to collect new logs. + ''' + l = [] + for node,rptlog,logfile,nextpos in a: + common_debug("updating %s from %s (pos %d)" % (logfile, node, nextpos)) + cmdline = "perl -e 'exit(%d) if (stat(\"%s\"))[7]<%d' && tail -c +%d %s" % (_EC_LOGROT, logfile, nextpos-1, nextpos, logfile) + myopts = ["-q", "-o", outdir, "-e", errdir] + opts, args = parse_args(myopts) + l.append([node, cmdline]) + return do_pssh(l, opts) + +def next_peinputs(node_pe_l, outdir, errdir): + ''' + pssh to nodes to collect new logs. + ''' + l = [] + for node,pe_l in node_pe_l: + r = re.search("(.*)/pengine/", pe_l[0]) + if not r: + common_err("strange, %s doesn't contain string pengine" % pe_l[0]) + continue + dir = "/%s" % r.group(1) + red_pe_l = [x.replace("%s/" % r.group(1),"") for x in pe_l] + common_debug("getting new PE inputs %s from %s" % (red_pe_l, node)) + cmdline = "tar -C %s -cf - %s" % (dir, ' '.join(red_pe_l)) + myopts = ["-q", "-o", outdir, "-e", errdir] + opts, args = parse_args(myopts) + l.append([node, cmdline]) + return do_pssh(l, opts) + +# vim:ts=4:sw=4:et: diff -r 9609937061d7 -r b694b75d2e33 shell/modules/log_patterns.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/shell/modules/log_patterns.py Mon Jul 18 12:35:57 2011 +0200 @@ -0,0 +1,69 @@ +# Copyright (C) 2011 Dejan Muhamedagic +# +# log pattern specification +# +# patterns are grouped one of several classes: +# - resources: pertaining to a resource +# - node: pertaining to a node +# - quorum: quorum changes +# - events: other interesting events (core dumps, etc) +# +# paterns are grouped based on a detail level +# detail level 0 is the lowest, i.e. should match the least +# number of relevant messages + +# NB: If you modify this file, you must follow python syntax! + +log_patterns = { + "resource": ( + ( # detail 0 + "lrmd:.*rsc:%%.*(start|stop)", + "lrmd:.*RA output:.*%%.*stderr", + "lrmd:.*WARN:.*Managed.*%%.*exited", + ), + ( # detail 1 + "lrmd:.*rsc:%%.*probe", + "lrmd:.*info:.*Managed.*%%.*exited", + ), + ), + "node": ( + ( # detail 0 + "%%.*Corosync.Cluster.Engine", + "%%.*Executive.Service.RELEASE", + "%%.*crm_shutdown:.Requesting.shutdown", + "%%.*pcmk_shutdown:.Shutdown.complete", + "%%.*Configuration.validated..Starting.heartbeat", + "pengine.*Scheduling Node %%", + "te_fence_node.*Exec.*%%", + "stonith-ng.*log_oper.*reboot.*%%", + "stonithd.*to STONITH.*%%", + "stonithd.*fenced node %%", + "pcmk_peer_update.*(lost|memb): %%", + "crmd.*ccm_event.*(NEW|LOST) %%", + ), + ( # detail 1 + ), + ), + "quorum": ( + ( # detail 0 + "crmd.*crm_update_quorum:.Updating.quorum.status", + "crmd.*ais.disp.*quorum.(lost|ac?quir)", + ), + ( # detail 1 + ), + ), + "events": ( + ( # detail 0 + "CRIT:", + "ERROR:", + ), + ( # detail 1 + "WARN:", + ), + ), +} + +transition_patt = ( + "crmd: .* Processing graph.*derived from (.*bz2)", # transition start + "crmd: .* Transition.*Source=(.*bz2): (Stopped|Complete|Terminated)", # and stop +) diff -r 9609937061d7 -r b694b75d2e33 shell/modules/main.py --- a/shell/modules/main.py Fri Jul 15 10:51:00 2011 +1000 +++ b/shell/modules/main.py Mon Jul 18 12:35:57 2011 +0200 @@ -172,7 +172,7 @@ def usage(rc): f = sys.stdout print >> f, """ usage: - crm [-D display_type] [-f file] [-hF] [args] + crm [-D display_type] [-f file] [-H hist_src] [-hFRDw] [--version] [args] Use crm without arguments for an interactive session. Supply one or more arguments for a "single-shot" use. @@ -226,8 +226,8 @@ def run(): try: opts, args = getopt.getopt(sys.argv[1:], \ - 'whdf:FRD:', ("wait","version","help","debug","file=",\ - "force","regression-tests","display=")) + 'whdf:FRD:H:', ("wait","version","help","debug","file=",\ + "force","regression-tests","display=","history=")) for o,p in opts: if o in ("-h","--help"): usage(0) @@ -250,6 +250,8 @@ Written by Dejan Muhamedagic options.interactive = False err_buf.reset_lineno() inp_file = p + elif o in ("-H","--history"): + options.history = p elif o in ("-w","--wait"): user_prefs.wait = "yes" except getopt.GetoptError,msg: diff -r 9609937061d7 -r b694b75d2e33 shell/modules/msg.py --- a/shell/modules/msg.py Fri Jul 15 10:51:00 2011 +1000 +++ b/shell/modules/msg.py Mon Jul 18 12:35:57 2011 +0200 @@ -27,6 +27,7 @@ class ErrorBuffer(Singleton): self.msg_list = [] self.mode = "immediate" self.lineno = -1 + self.written = {} def buffer(self): self.mode = "keep" def release(self): @@ -65,6 +66,10 @@ class ErrorBuffer(Singleton): self.writemsg("ERROR: %s" % self.add_lineno(s)) def warning(self,s): self.writemsg("WARNING: %s" % self.add_lineno(s)) + def one_warning(self,s): + if not s in self.written: + self.written[s] = 1 + self.writemsg("WARNING: %s" % self.add_lineno(s)) def info(self,s): self.writemsg("INFO: %s" % self.add_lineno(s)) def debug(self,s): @@ -79,12 +84,16 @@ def common_warning(s): err_buf.warning(s) def common_warn(s): err_buf.warning(s) +def warn_once(s): + err_buf.one_warning(s) def common_info(s): err_buf.info(s) def common_debug(s): err_buf.debug(s) def no_prog_err(name): err_buf.error("%s not available, check your installation"%name) +def no_file_err(name): + err_buf.error("%s does not exist"%name) def missing_prog_warn(name): err_buf.warning("could not find any %s on the system"%name) def node_err(msg, node): diff -r 9609937061d7 -r b694b75d2e33 shell/modules/report.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/shell/modules/report.py Mon Jul 18 12:35:57 2011 +0200 @@ -0,0 +1,887 @@ +# Copyright (C) 2011 Dejan Muhamedagic +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import os +import sys +import time +import datetime +import copy +import re +import glob + +from singletonmixin import Singleton +from userprefs import Options, UserPrefs +from cibconfig import CibFactory +from vars import Vars, getuser +from term import TerminalController +from xmlutil import * +from utils import * +from msg import * +from log_patterns import log_patterns, transition_patt +_NO_PSSH = False +try: + from crm_pssh import next_loglines, next_peinputs +except: + _NO_PSSH = True + +# +# hb_report interface +# +# read hb_report generated report, show interesting stuff, search +# through logs, get PE input files, get log slices (perhaps even +# coloured nicely!) +# + +def mk_re_list(patt_l,repl): + 'Build a list of regular expressions, replace "%%" with repl' + l = [] + for re_l in patt_l: + l += [ x.replace("%%",repl) for x in re_l ] + if not repl: + l = [ x.replace(".*.*",".*") for x in l ] + return l + +YEAR = time.strftime("%Y") +def syslog_ts(s): + try: + # strptime defaults year to 1900 (sigh) + tm = time.strptime(' '.join([YEAR] + s.split()[0:3]),"%Y %b %d %H:%M:%S") + return time.mktime(tm) + except: + common_warn("malformed line: %s" % s) + return None + +def log_seek(f, ts, endpos = False): + ''' + f is an open log. Do binary search for the timestamp. + Return the position of the (more or less) first line with a + newer time. + ''' + first = 0 + f.seek(0,2) + last = f.tell() + if not ts: + return endpos and last or first + badline = 0 + maxbadline = 10 + common_debug("seek ts %s" % time.ctime(ts)) + while first <= last: + # we can skip some iterations if it's about few lines + if abs(first-last) < 120: + break + mid = (first+last)/2 + f.seek(mid) + log_ts = get_timestamp(f) + if not log_ts: + badline += 1 + if badline > maxbadline: + common_warn("giving up on log %s" % f.name) + return -1 + first += 120 # move forward a bit + continue + if log_ts > ts: + last = mid-1 + elif log_ts < ts: + first = mid+1 + else: + break + common_debug("sought to %s" % time.ctime(log_ts)) + return f.tell() + +def get_timestamp(f): + ''' + Get the whole line from f. The current file position is + usually in the middle of the line. + Then get timestamp and return it. + ''' + step = 30 # no line should be longer than 30 + cnt = 1 + current_pos = f.tell() + s = f.readline() + if not s: # EOF? + f.seek(-step, 1) # backup a bit + current_pos = f.tell() + s = f.readline() + while s and current_pos < f.tell(): + if cnt*step >= f.tell(): # at 0? + f.seek(0) + break + f.seek(-cnt*step, 1) + s = f.readline() + cnt += 1 + pos = f.tell() # save the position ... + s = f.readline() # get the line + f.seek(pos) # ... and move the cursor back there + if not s: # definitely EOF (probably cannot happen) + return None + return syslog_ts(s) + +def is_our_log(s, node_l): + try: return s.split()[3] in node_l + except: return False +def log2node(log): + return os.path.basename(os.path.dirname(log)) +def filter(sl, log_l): + ''' + Filter list of messages to get only those from the given log + files list. + ''' + node_l = [log2node(x) for x in log_l if x] + return [x for x in sl if is_our_log(x, node_l)] +def convert_dt(dt): + try: return time.mktime(dt.timetuple()) + except: return None + +class LogSyslog(object): + ''' + Slice log, search log. + self.fp is an array of dicts. + ''' + def __init__(self, central_log, log_l, from_dt, to_dt): + self.log_l = log_l + self.central_log = central_log + self.f = {} + self.startpos = {} + self.endpos = {} + self.cache = {} + self.open_logs() + self.set_log_timeframe(from_dt, to_dt) + def open_log(self, log): + try: + self.f[log] = open(log) + except IOError,msg: + common_err("open %s: %s"%(log,msg)) + def open_logs(self): + if self.central_log: + self.open_log(self.central_log) + else: + for log in self.log_l: + self.open_log(log) + def set_log_timeframe(self, from_dt, to_dt): + ''' + Convert datetime to timestamps (i.e. seconds), then + find out start/end file positions. Logs need to be + already open. + ''' + self.from_ts = convert_dt(from_dt) + self.to_ts = convert_dt(to_dt) + bad_logs = [] + for log in self.f: + f = self.f[log] + start = log_seek(f, self.from_ts) + end = log_seek(f, self.to_ts, endpos = True) + if start == -1 or end == -1: + bad_logs.append(log) + else: + self.startpos[f] = start + self.endpos[f] = end + for log in bad_logs: + del self.f[log] + self.log_l.remove(log) + def get_match_line(self, f, patt): + ''' + Get first line from f that matches re_s, but is not + behind endpos[f]. + ''' + while f.tell() < self.endpos[f]: + fpos = f.tell() + s = f.readline().rstrip() + if not patt or patt.search(s): + return s,fpos + return '',-1 + def single_log_list(self, f, patt): + l = [] + while True: + s = self.get_match_line(f, patt)[0] + if not s: + return l + l.append(s) + return l + def search_logs(self, log_l, re_s = ''): + ''' + Search logs for re_s and sort by time. + ''' + patt = None + if re_s: + patt = re.compile(re_s) + # if there's central log, there won't be merge + if self.central_log: + fl = [ self.f[f] for f in self.f ] + else: + fl = [ self.f[f] for f in self.f if self.f[f].name in log_l ] + for f in fl: + f.seek(self.startpos[f]) + # get head lines of all nodes + top_line = [ self.get_match_line(x, patt)[0] for x in fl ] + top_line_ts = [] + rm_idx_l = [] + # calculate time stamps for head lines + for i in range(len(top_line)): + if not top_line[i]: + rm_idx_l.append(i) + else: + top_line_ts.append(syslog_ts(top_line[i])) + # remove files with no matches found + rm_idx_l.reverse() + for i in rm_idx_l: + del fl[i],top_line[i] + common_debug("search <%s> in %s" % (re_s, [ f.name for f in fl ])) + if len(fl) == 0: # nothing matched ? + return [] + if len(fl) == 1: + # no need to merge if there's only one log + return [top_line[0]] + self.single_log_list(fl[0],patt) + # search through multiple logs, merge sorted by time + l = [] + first = 0 + while True: + for i in range(len(fl)): + try: + if i == first: + continue + if top_line_ts[i] and top_line_ts[i] < top_line_ts[first]: + first = i + except: pass + if not top_line[first]: + break + l.append(top_line[first]) + top_line[first] = self.get_match_line(fl[first], patt)[0] + if not top_line[first]: + top_line_ts[first] = time.time() + else: + top_line_ts[first] = syslog_ts(top_line[first]) + return l + def get_matches(self, re_l, log_l = []): + ''' + Return a list of log messages which + match one of the regexes in re_l. + ''' + if not log_l: + log_l = self.log_l + re_s = '|'.join(re_l) + return filter(self.search_logs(log_l, re_s), log_l) + # caching is not ready! + # gets complicated because of different time frames + # (TODO) + #if not re_s: # just list logs + # return filter(self.search_logs(log_l), log_l) + #if re_s not in self.cache: # cache regex search + # self.cache[re_s] = self.search_logs(log_l, re_s) + #return filter(self.cache[re_s], log_l) + +def human_date(dt): + 'Some human date representation. Date defaults to now.' + if not dt: + dt = datetime.datetime.now() + # drop microseconds + return re.sub("[.].*","","%s %s" % (dt.date(),dt.time())) + +def is_log(p): + return os.path.isfile(p) and os.path.getsize(p) > 0 + +def pe_file_in_range(pe_f, a, ext): + r = re.search("pe-[^-]+-([0-9]+)[.]%s$" % ext, pe_f) + if not r: + return None + if not a or (a[0] <= int(r.group(1)) <= a[1]): + return pe_f + return None + +def read_log_info(log): + 'Read .info and return logfile and next pos' + s = file2str("%s.info" % log) + try: + logf,pos = s.split() + return logf, int(pos) + except: + warn_once("hb_report too old, you need to update cluster-glue") + return '',-1 + +def update_loginfo(rptlog, logfile, oldpos, appended_file): + 'Update .info with new next pos' + newpos = oldpos + os.stat(appended_file).st_size + try: + f = open("%s.info" % rptlog, "w") + f.write("%s %d\n" % (logfile, newpos)) + f.close() + except IOError, msg: + common_err("couldn't the update %s.info: %s" % (rptlog, msg)) + +class Report(Singleton): + ''' + A hb_report class. + ''' + live_recent = 6*60*60 # recreate live hb_report once every 6 hours + short_live_recent = 60 # update once a minute + nodecolors = ( + "NORMAL", "GREEN", "CYAN", "MAGENTA", "YELLOW", "WHITE", "BLUE", "RED" + ) + def __init__(self): + self.source = None + self.loc = None + self.ready = False + self.from_dt = None + self.to_dt = None + self.detail = 0 + self.nodecolor = {} + self.logobj = None + self.desc = None + self.log_l = [] + self.central_log = None + self.cibgrp_d = {} + self.cibrsc_l = [] + self.cibnode_l = [] + self.setnodes = [] + self.outdir = os.path.join(vars.report_cache,"psshout") + self.errdir = os.path.join(vars.report_cache,"pssherr") + self.last_live_update = 0 + def error(self, s): + common_err("%s: %s" % (self.source, s)) + def warn(self, s): + common_warn("%s: %s" % (self.source, s)) + def rsc_list(self): + return self.cibgrp_d.keys() + self.cibrsc_l + def node_list(self): + return self.cibnode_l + def peinputs_list(self): + return [re.search("pe-[^-]+-([0-9]+)[.]bz2$", x).group(1) + for x in self._file_list("bz2")] + def unpack_report(self, tarball): + ''' + Unpack hb_report tarball. + Don't unpack if the directory already exists! + ''' + bfname = os.path.basename(tarball) + parentdir = os.path.dirname(tarball) + common_debug("tarball: %s, in dir: %s" % (bfname,parentdir)) + if bfname.endswith(".tar.bz2"): + loc = tarball.replace(".tar.bz2","") + tar_unpack_option = "j" + elif bfname.endswith(".tar.gz"): # hmm, must be ancient + loc = tarball.replace(".tar.gz","") + tar_unpack_option = "z" + else: + self.error("this doesn't look like a report tarball") + return None + if os.path.isdir(loc): + return loc + cwd = os.getcwd() + try: + os.chdir(parentdir) + except OSError,msg: + self.error(msg) + return None + rc = ext_cmd_nosudo("tar -x%s < %s" % (tar_unpack_option,bfname)) + if self.source == "live": + os.remove(bfname) + os.chdir(cwd) + if rc != 0: + return None + return loc + def get_nodes(self): + return [ os.path.basename(p) + for p in os.listdir(self.loc) + if os.path.isdir(os.path.join(self.loc, p)) and + os.path.isfile(os.path.join(self.loc, p, "cib.txt")) + ] + def check_nodes(self): + 'Verify if the nodes in cib match the nodes in the report.' + nl = self.get_nodes() + if not nl: + self.error("no nodes in report") + return False + for n in self.cibnode_l: + if not (n in nl): + self.warn("node %s not in report" % n) + else: + nl.remove(n) + if nl: + self.warn("strange, extra node(s) %s in report" % ','.join(nl)) + return True + def check_report(self): + ''' + Check some basic properties of the report. + ''' + if not self.loc: + return False + if not os.access(self.desc, os.F_OK): + self.error("no description file in the report") + return False + if not self.check_nodes(): + return False + return True + def _live_loc(self): + return os.path.join(vars.report_cache,"live") + def is_last_live_recent(self): + ''' + Look at the last live hb_report. If it's recent enough, + return True. Return True also if self.to_dt is not empty + (not an open end report). + ''' + try: + last_ts = os.stat(self.desc).st_mtime + return (time.time() - last_ts <= self.live_recent) + except Exception, msg: + self.warn(msg) + self.warn("strange, couldn't stat %s" % self.desc) + return False + def find_node_log(self, node): + p = os.path.join(self.loc, node) + if is_log(os.path.join(p, "ha-log.txt")): + return os.path.join(p, "ha-log.txt") + elif is_log(os.path.join(p, "messages")): + return os.path.join(p, "messages") + else: + return None + def find_central_log(self): + 'Return common log, if found.' + central_log = os.path.join(self.loc, "ha-log.txt") + if is_log(central_log): + logf, pos = read_log_info(central_log) + if logf.startswith("synthetic"): + # not really central log + return + common_debug("found central log %s" % logf) + self.central_log = central_log + def find_logs(self): + 'Return a list of logs found (one per node).' + l = [] + for node in self.get_nodes(): + log = self.find_node_log(node) + if log: + l.append(log) + else: + self.warn("no log found for node %s" % node) + self.log_l = l + def append_newlogs(self, a): + ''' + Append new logs fetched from nodes. + ''' + if not os.path.isdir(self.outdir): + return + for node,rptlog,logfile,nextpos in a: + fl = glob.glob("%s/*%s*" % (self.outdir,node)) + if not fl: + continue + append_file(rptlog,fl[0]) + update_loginfo(rptlog, logfile, nextpos, fl[0]) + def unpack_new_peinputs(self, a): + ''' + Untar PE inputs fetched from nodes. + ''' + if not os.path.isdir(self.outdir): + return + for node,pe_l in a: + fl = glob.glob("%s/*%s*" % (self.outdir,node)) + if not fl: + continue + u_dir = os.path.join(self.loc, node) + rc = ext_cmd_nosudo("tar -C %s -x < %s" % (u_dir,fl[0])) + def find_new_peinputs(self, a): + ''' + Get a list of pe inputs appearing in logs. + ''' + if not os.path.isdir(self.outdir): + return [] + l = [] + for node,rptlog,logfile,nextpos in a: + node_l = [] + fl = glob.glob("%s/*%s*" % (self.outdir,node)) + if not fl: + continue + for s in file2list(fl[0]): + r = re.search(transition_patt[0], s) + if not r: + continue + node_l.append(r.group(1)) + if node_l: + common_debug("found new PE inputs %s at %s" % + ([os.path.basename(x) for x in node_l], node)) + l.append([node,node_l]) + return l + def update_live(self): + ''' + Update the existing live report, if it's older than + self.short_live_recent: + - append newer logs + - get new PE inputs + ''' + if (time.time() - self.last_live_update) <= self.short_live_recent: + return True + if _NO_PSSH: + warn_once("pssh not installed, slow live updates ahead") + return False + a = [] + common_info("fetching new logs, please wait ...") + for rptlog in self.log_l: + node = log2node(rptlog) + logf, pos = read_log_info(rptlog) + if logf: + a.append([node, rptlog, logf, pos]) + if not a: + common_info("no elligible logs found :(") + return False + rmdir_r(self.outdir) + rmdir_r(self.errdir) + rc1 = next_loglines(a, self.outdir, self.errdir) + self.append_newlogs(a) + pe_l = self.find_new_peinputs(a) + rmdir_r(self.outdir) + rmdir_r(self.errdir) + rc2 = True + if pe_l: + rc2 = next_peinputs(pe_l, self.outdir, self.errdir) + self.unpack_new_peinputs(pe_l) + self.logobj = None + rmdir_r(self.outdir) + rmdir_r(self.errdir) + self.last_live_update = time.time() + return (rc1 and rc2) + def get_live_report(self): + acquire_lock(vars.report_cache) + loc = self.new_live_hb_report() + release_lock(vars.report_cache) + return loc + def manage_live_report(self, force = False): + ''' + Update or create live report. + ''' + d = self._live_loc() + if not d or not os.path.isdir(d): + return self.get_live_report() + if not self.loc: + # the live report is there, but we were just invoked + self.loc = d + self.report_setup() + if not force and self.is_last_live_recent(): + acquire_lock(vars.report_cache) + rc = self.update_live() + release_lock(vars.report_cache) + if rc: + return self._live_loc() + return self.get_live_report() + def new_live_hb_report(self): + ''' + Run hb_report to get logs now. + ''' + d = self._live_loc() + rmdir_r(d) + tarball = "%s.tar.bz2" % d + to_option = "" + if self.to_dt: + to_option = "-t '%s'" % human_date(self.to_dt) + nodes_option = "" + if self.setnodes: + nodes_option = "'-n %s'" % ' '.join(self.setnodes) + if ext_cmd_nosudo("mkdir -p %s" % os.path.dirname(d)) != 0: + return None + common_info("retrieving information from cluster nodes, please wait ...") + rc = ext_cmd_nosudo("hb_report -f '%s' %s %s %s" % + (self.from_dt.ctime(), to_option, nodes_option, d)) + if rc != 0: + if os.path.isfile(tarball): + self.warn("hb_report thinks it failed, proceeding anyway") + else: + self.error("hb_report failed") + return None + self.last_live_update = time.time() + return self.unpack_report(tarball) + def reset_period(self): + self.from_dt = None + self.to_dt = None + def set_source(self,src): + 'Set our source.' + self.source = src + def set_period(self,from_dt,to_dt): + ''' + Set from/to_dt. + ''' + common_debug("setting report times: <%s> - <%s>" % (from_dt,to_dt)) + if not self.from_dt: + self.from_dt = from_dt + self.to_dt = to_dt + elif self.source != "live": + if self.from_dt > from_dt: + self.error("from time %s not within report" % from_dt) + return False + if to_dt and self.to_dt < to_dt: + self.error("end time %s not within report" % to_dt) + return False + self.from_dt = from_dt + self.to_dt = to_dt + else: + need_ref = (self.from_dt > from_dt or \ + (to_dt and self.to_dt < to_dt)) + self.from_dt = from_dt + self.to_dt = to_dt + if need_ref: + self.refresh_source(force = True) + if self.logobj: + self.logobj.set_log_timeframe(self.from_dt, self.to_dt) + def set_detail(self,detail_lvl): + ''' + Set the detail level. + ''' + self.detail = int(detail_lvl) + def set_nodes(self,*args): + ''' + Allow user to set the node list (necessary if the host is + not part of the cluster). + ''' + self.setnodes = args + def read_cib(self): + ''' + Get some information from the report's CIB (node list, + resource list, groups). If "live" and not central log, + then use cibadmin. + ''' + nl = self.get_nodes() + if not nl: + return + if self.source == "live" and not self.central_log: + doc = cibdump2doc() + else: + doc = file2doc(os.path.join(self.loc,nl[0],"cib.xml")) + if not doc: + return # no cib? + try: conf = doc.getElementsByTagName("configuration")[0] + except: # bad cib? + return + self.cibrsc_l = [ x.getAttribute("id") + for x in conf.getElementsByTagName("primitive") ] + self.cibnode_l = [ x.getAttribute("uname") + for x in conf.getElementsByTagName("node") ] + for grp in conf.getElementsByTagName("group"): + self.cibgrp_d[grp.getAttribute("id")] = get_rsc_children_ids(grp) + def set_node_colors(self): + i = 0 + for n in self.cibnode_l: + self.nodecolor[n] = self.nodecolors[i] + i = (i+1) % len(self.nodecolors) + def report_setup(self): + if not self.loc: + return + self.desc = os.path.join(self.loc,"description.txt") + self.find_logs() + self.find_central_log() + self.read_cib() + self.set_node_colors() + self.logobj = None + def prepare_source(self): + ''' + Unpack a hb_report tarball. + For "live", create an ad-hoc hb_report and unpack it + somewhere in the cache area. + Parse the period. + ''' + if self.ready and self.source != "live": + return True + if self.source == "live": + self.loc = self.manage_live_report() + elif os.path.isfile(self.source): + self.loc = self.unpack_report(self.source) + elif os.path.isdir(self.source): + self.loc = self.source + if not self.loc: + return False + self.report_setup() + self.ready = self.check_report() + return self.ready + def refresh_source(self, force = False): + ''' + Refresh report from live. + ''' + if self.source != "live": + self.error("refresh not supported") + return False + self.loc = self.manage_live_report(force) + self.report_setup() + self.ready = self.check_report() + return self.ready + def get_patt_l(self,type): + ''' + get the list of patterns for this type, up to and + including current detail level + ''' + if not type in log_patterns: + common_error("%s not featured in log patterns" % type) + return None + return log_patterns[type][0:self.detail+1] + def build_re(self,type,args): + ''' + Prepare a regex string for the type and args. + For instance, "resource" and rsc1, rsc2, ... + ''' + patt_l = self.get_patt_l(type) + if not patt_l: + return None + if not args: + re_l = mk_re_list(patt_l,"") + else: + re_l = mk_re_list(patt_l,r'(%s)\W' % "|".join(args)) + return re_l + def disp(self, s): + 'color output' + a = s.split() + try: clr = self.nodecolor[a[3]] + except: return s + return termctrl.render("${%s}%s${NORMAL}" % (clr,s)) + def show_logs(self, log_l = [], re_l = []): + ''' + Print log lines, either matched by re_l or all. + ''' + if not log_l: + log_l = self.log_l + if not self.central_log and not log_l: + self.error("no logs found") + return + if not self.logobj: + self.logobj = LogSyslog(self.central_log, log_l, \ + self.from_dt, self.to_dt) + l = self.logobj.get_matches(re_l, log_l) + if not options.batch and sys.stdout.isatty(): + page_string('\n'.join([ self.disp(x) for x in l ])) + else: # raw output + try: # in case user quits the next prog in pipe + for s in l: print s + except IOError, msg: + if not ("Broken pipe" in msg): + common_err(msg) + def match_args(self, cib_l, args): + for a in args: + a_clone = re.sub(r':.*', '', a) + if not (a in cib_l) and not (a_clone in cib_l): + self.warn("%s not found in report, proceeding anyway" % a) + def get_desc_line(self,fld): + try: + f = open(self.desc) + except IOError,msg: + common_err("open %s: %s"%(self.desc,msg)) + return + for s in f: + if s.startswith("%s: " % fld): + f.close() + s = s.replace("%s: " % fld,"").rstrip() + return s + f.close() + def info(self): + ''' + Print information about the source. + ''' + if not self.prepare_source(): + return False + print "Source: %s" % self.source + if self.source != "live": + print "Created:", self.get_desc_line("Date") + print "By:", self.get_desc_line("By") + print "Period: %s - %s" % \ + ((self.from_dt and human_date(self.from_dt) or "start"), + (self.to_dt and human_date(self.to_dt) or "end")) + print "Nodes:",' '.join(self.cibnode_l) + print "Groups:",' '.join(self.cibgrp_d.keys()) + print "Resources:",' '.join(self.cibrsc_l) + def latest(self): + ''' + Get the very latest cluster events, basically from the + latest transition. + Some transitions may be empty though. + ''' + def events(self): + ''' + Show all events. + ''' + if not self.prepare_source(): + return False + all_re_l = self.build_re("resource",self.cibrsc_l) + \ + self.build_re("node",self.cibnode_l) + if not all_re_l: + self.error("no resources or nodes found") + return False + self.show_logs(re_l = all_re_l) + def resource(self,*args): + ''' + Show resource relevant logs. + ''' + if not self.prepare_source(): + return False + # expand groups (if any) + expanded_l = [] + for a in args: + if a in self.cibgrp_d: + expanded_l += self.cibgrp_d[a] + else: + expanded_l.append(a) + self.match_args(self.cibrsc_l,expanded_l) + rsc_re_l = self.build_re("resource",expanded_l) + if not rsc_re_l: + return False + self.show_logs(re_l = rsc_re_l) + def node(self,*args): + ''' + Show node relevant logs. + ''' + if not self.prepare_source(): + return False + self.match_args(self.cibnode_l,args) + node_re_l = self.build_re("node",args) + if not node_re_l: + return False + self.show_logs(re_l = node_re_l) + def log(self,*args): + ''' + Show logs for a node or all nodes. + ''' + if not self.prepare_source(): + return False + if not args: + self.show_logs() + else: + l = [] + for n in args: + if n not in self.cibnode_l: + self.warn("%s: no such node" % n) + continue + l.append(self.find_node_log(n)) + if not l: + return False + self.show_logs(log_l = l) + def _file_list(self, ext, a = []): + ''' + Return list of PE (or dot) files (abs paths) sorted by + mtime. + Input is a number or a pair of numbers representing + range. Otherwise, all matching files are returned. + ''' + if not self.prepare_source(): + return [] + if not isinstance(a,(tuple,list)) and a is not None: + a = [a,a] + return sort_by_mtime([x for x in dirwalk(self.loc) \ + if pe_file_in_range(x,a,ext)]) + def pelist(self, a = []): + return self._file_list("bz2", a) + def dotlist(self, a = []): + return self._file_list("dot", a) + def find_file(self, f): + return file_find_by_name(self.loc, f) + +vars = Vars.getInstance() +options = Options.getInstance() +termctrl = TerminalController.getInstance() +cib_factory = CibFactory.getInstance() +crm_report = Report.getInstance() +# vim:ts=4:sw=4:et: diff -r 9609937061d7 -r b694b75d2e33 shell/modules/ui.py.in --- a/shell/modules/ui.py.in Fri Jul 15 10:51:00 2011 +1000 +++ b/shell/modules/ui.py.in Mon Jul 18 12:35:57 2011 +0200 @@ -27,6 +27,7 @@ from vars import Vars from levels import Levels from cibconfig import mkset_obj, CibFactory from cibstatus import CibStatus +from report import Report from template import LoadTemplate from cliformat import nvpairs2list from ra import * @@ -1390,6 +1391,7 @@ cluster. self.cmd_table["cib"] = CibShadow self.cmd_table["cibstatus"] = StatusMgmt self.cmd_table["template"] = Template + self.cmd_table["history"] = History self.cmd_table["_test"] = (self.check_structure,(0,0),1,0) self.cmd_table["_regtest"] = (self.regression_testing,(1,1),1,0) self.cmd_table["_objects"] = (self.showobjects,(0,0),1,0) @@ -1661,6 +1663,227 @@ cluster. self.commit("commit") cib_factory.reset() +class History(UserInterface): + ''' + The history class + ''' + lvl_name = "history" + desc_short = "CRM cluster history" + desc_long = """ +The history level. + +Examine Pacemaker's history: node and resource events, logs. +""" + def __init__(self): + UserInterface.__init__(self) + self.cmd_table["source"] = (self.source,(1,1),1,0) + self.cmd_table["limit"] = (self.limit,(1,2),1,0) + self.cmd_table["refresh"] = (self.refresh,(0,1),1,0) + self.cmd_table["detail"] = (self.detail,(1,1),1,0) + self.cmd_table["setnodes"] = (self.setnodes,(1,),1,0) + self.cmd_table["info"] = (self.info,(0,0),1,0) + self.cmd_table["latest"] = (self.latest,(0,0),1,0) + self.cmd_table["resource"] = (self.resource,(1,),1,0) + self.cmd_table["node"] = (self.node,(1,),1,1) + self.cmd_table["log"] = (self.log,(0,),1,0) + self.cmd_table["peinputs"] = (self.peinputs,(0,),1,0) + self._set_source(options.history) + def _no_source(self): + common_error("we have no source set yet! please use the source command") + def parse_time(self, t): + ''' + Try to make sense of the user provided time spec. + Use dateutil if available, otherwise strptime. + Return the datetime value. + ''' + try: + import dateutil.parser + dt = dateutil.parser.parse(t) + except ValueError,msg: + common_err("%s: %s" % (t,msg)) + return None + except ImportError,msg: + import datetime + try: + tm = time.strptime(t) + dt = datetime.datetime(*tm[0:7]) + except ValueError,msg: + common_err("no dateutil, please provide times as printed by date(1)") + return None + return dt + def _set_period(self,from_time,to_time = ''): + ''' + parse time specs and set period + ''' + from_dt = self.parse_time(from_time) + if not from_dt: + return False + to_dt = None + if to_time: + to_dt = self.parse_time(to_time) + if not to_dt: + return False + if to_dt and to_dt <= from_dt: + common_err("%s - %s: bad period" % from_time, to_time) + return False + common_debug("setting period to <%s>:<%s>" % (from_dt,to_dt)) + return crm_report.set_period(from_dt,to_dt) + def _check_source(self,src): + 'a (very) quick source check' + if src == "live" or os.path.isfile(src) or os.path.isdir(src): + return True + else: + common_error("source %s doesn't exist" % src) + return False + def _set_source(self,src,live_from_time = None): + ''' + Have the last history source survive the History + and Report instances + ''' + common_debug("setting source to %s" % src) + if not self._check_source(src): + return False + crm_report.set_source(src) + options.history = src + options.report_to_time = '' + rc = True + if src == "live": + options.report_from_time = time.ctime(live_from_time and \ + live_from_time or (time.time() - 60*60)) + rc = self._set_period(\ + options.report_from_time, options.report_to_time) + else: + options.report_from_time = '' + return rc + def source(self,cmd,src = None): + "usage: source {||live}" + if src != options.history: + return self._set_source(src) + def limit(self,cmd,from_time,to_time = ''): + "usage: limit []" + if options.report_from_time != from_time or \ + options.report_to_time != to_time: + if not self._set_period(from_time, to_time): + return False + options.report_from_time = from_time + options.report_to_time = to_time + def refresh(self, cmd, force = ''): + "usage: refresh" + if options.history != "live": + common_info("nothing to refresh if source isn't live") + return False + if force: + if force != "force" and force != "--force": + syntax_err((cmd,force), context = 'refresh') + return False + force = True + return crm_report.refresh_source(force) + def detail(self,cmd,detail_lvl): + "usage: detail " + detail_num = convert2ints(detail_lvl) + if not (isinstance(detail_num,int) and int(detail_num) >= 0): + bad_usage(cmd,detail_lvl) + return False + return crm_report.set_detail(detail_lvl) + def setnodes(self,cmd,*args): + "usage: setnodes [ ...]" + if options.history != "live": + common_info("setting nodes not necessary for existing reports, proceeding anyway") + return crm_report.set_nodes(*args) + def info(self,cmd): + "usage: info" + return crm_report.info() + def latest(self,cmd): + "usage: latest" + try: + prev_level = levels.previous().myname() + except: + prev_level = '' + if prev_level != "cibconfig": + common_err("%s is available only when invoked from configure" % cmd) + return False + ts = cib_factory.last_commit_at() + if not ts: + common_err("no last commit time found") + return False + if not wait4dc("transition", not options.batch): + return False + self._set_source("live", ts) + crm_report.refresh_source() + return crm_report.events() + def resource(self,cmd,*args): + "usage: resource [ ...]" + return crm_report.resource(*args) + def node(self,cmd,*args): + "usage: node [ ...]" + return crm_report.node(*args) + def log(self,cmd,*args): + "usage: log [ ...]" + return crm_report.log(*args) + def ptest(self, nograph, scores, utilization, actions, verbosity): + 'Send a decompressed self.pe_file to ptest' + try: + f = open(self.pe_file) + except IOError,msg: + common_err("open: %s"%msg) + return False + s = bz2.decompress(''.join(f)) + f.close() + return run_ptest(s, nograph, scores, utilization, actions, verbosity) + def peinputs(self,cmd,subcmd,*args): + """usage: peinputs list [{|} ...] + peinputs get [{|} ...] + peinputs show [] [nograph] [v...] [scores] [actions] [utilization] + peinputs showdot []""" + if subcmd in ("get","list"): + if args: + l = [] + for s in args: + a = convert2ints(s.split(':')) + if len(a) == 2 and not check_range(a): + common_err("%s: invalid peinputs range" % a) + return False + l += crm_report.pelist(a) + else: + l = crm_report.pelist() + if not l: return False + if subcmd == "list": + s = get_stdout("ls -lrt %s" % ' '.join(l)) + page_string(s) + else: + print '\n'.join(l) + elif subcmd in ("show","showdot"): + try: n = convert2ints(args[0]) + except: n = None + startarg = 1 + if n is None: + idx = -1 + startarg = 0 # peinput number missing + elif n <= 0: + idx = n - 1 + n = [] # to get all peinputs + else: + idx = 0 + if subcmd == "showdot": + if not user_prefs.dotty: + common_err("install graphviz to draw transition graphs") + return False + l = crm_report.dotlist(n) + else: + l = crm_report.pelist(n) + if len(l) < abs(idx): + common_err("pe input or dot file not found") + return False + if subcmd == "show": + self.pe_file = l[idx] + return ptestlike(self.ptest,'vv',"%s %s" % \ + (cmd, subcmd), *args[startarg:]) + else: + show_dot_graph(l[idx]) + else: + bad_usage(cmd,' '.join(subcmd,args)) + return False + class TopLevel(UserInterface): ''' The top level. @@ -1681,6 +1904,7 @@ class TopLevel(UserInterface): self.cmd_table['configure'] = CibConfig self.cmd_table['node'] = NodeMgmt self.cmd_table['options'] = CliOptions + self.cmd_table['history'] = History self.cmd_table['status'] = (self.status,(0,5),0,0) self.cmd_table['ra'] = RA setup_aliases(self) @@ -1726,5 +1950,5 @@ vars = Vars.getInstance() levels = Levels.getInstance(TopLevel) cib_status = CibStatus.getInstance() cib_factory = CibFactory.getInstance() - +crm_report = Report.getInstance() # vim:ts=4:sw=4:et: diff -r 9609937061d7 -r b694b75d2e33 shell/modules/userprefs.py --- a/shell/modules/userprefs.py Fri Jul 15 10:51:00 2011 +1000 +++ b/shell/modules/userprefs.py Mon Jul 18 12:35:57 2011 +0200 @@ -18,6 +18,7 @@ from os import getenv import subprocess import sys +import datetime, time from singletonmixin import Singleton from term import TerminalController @@ -26,6 +27,10 @@ class Options(Singleton): interactive = False batch = False regression_tests = False + history = "live" + # now minus one hour + report_from_time = time.ctime(time.time()-60*60) + report_to_time = "" options = Options.getInstance() termctrl = TerminalController.getInstance() diff -r 9609937061d7 -r b694b75d2e33 shell/modules/utils.py --- a/shell/modules/utils.py Fri Jul 15 10:51:00 2011 +1000 +++ b/shell/modules/utils.py Mon Jul 18 12:35:57 2011 +0200 @@ -22,6 +22,7 @@ import subprocess import re import glob import time +import shutil from userprefs import Options, UserPrefs from vars import Vars @@ -177,6 +178,29 @@ def str2file(s,fname): f.write(s) f.close() return True +def file2str(fname, noerr = True): + ''' + Read a one line file into a string, strip whitespace around. + ''' + try: f = open(fname,"r") + except IOError, msg: + if not noerr: + common_err(msg) + return None + s = f.readline() + f.close() + return s.strip() +def file2list(fname): + ''' + Read a file into a list (newlines dropped). + ''' + try: f = open(fname,"r") + except IOError, msg: + common_err(msg) + return None + l = ''.join(f).split('\n') + f.close() + return l def is_filename_sane(name): if re.search("['`/#*?$\[\]]",name): @@ -203,9 +227,70 @@ def ext_cmd(cmd): print ".EXT", cmd return subprocess.call(add_sudo(cmd), shell=True) -def get_stdout(cmd, stderr_on = True): +def rmdir_r(d): + if d and os.path.isdir(d): + shutil.rmtree(d) + +_LOCKDIR = ".lockdir" +_PIDF = "pid" +def check_locker(dir): + if not os.path.isdir(os.path.join(dir,_LOCKDIR)): + return + s = file2str(os.path.join(dir,_LOCKDIR,_PIDF)) + pid = convert2ints(s) + if not isinstance(pid,int): + common_warn("history: removing malformed lock") + rmdir_r(os.path.join(dir,_LOCKDIR)) + return + try: + os.kill(pid, 0) + except OSError, err: + if err.errno == os.errno.ESRCH: + common_info("history: removing stale lock") + rmdir_r(os.path.join(dir,_LOCKDIR)) + else: + common_err("%s: %s" % (_LOCKDIR,err.message)) +def acquire_lock(dir): + check_locker(dir) + while True: + try: + os.mkdir(os.path.join(dir,_LOCKDIR)) + str2file("%d" % os.getpid(),os.path.join(dir,_LOCKDIR,_PIDF)) + return True + except OSError, e: + if e.errno != os.errno.EEXIST: + common_err("%s" % e.message) + return False + time.sleep(0.1) + continue + else: + return False +def release_lock(dir): + rmdir_r(os.path.join(dir,_LOCKDIR)) + +#def ext_cmd_nosudo(cmd): +# if options.regression_tests: +# print ".EXT", cmd +# return subprocess.call(cmd, shell=True) + +def ext_cmd_nosudo(cmd): + if options.regression_tests: + print ".EXT", cmd + proc = subprocess.Popen(cmd, shell = True, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + (outp,err_outp) = proc.communicate() + proc.wait() + rc = proc.returncode + if rc != 0: + print outp + print err_outp + return rc + +def get_stdout(cmd, input_s = None, stderr_on = True): ''' - Run a cmd, return stdin output. + Run a cmd, return stdout output. + Optional input string "input_s". stderr_on controls whether to show output which comes on stderr. ''' if stderr_on: @@ -213,8 +298,9 @@ def get_stdout(cmd, stderr_on = True): else: stderr = subprocess.PIPE proc = subprocess.Popen(cmd, shell = True, \ + stdin = subprocess.PIPE, \ stdout = subprocess.PIPE, stderr = stderr) - outp = proc.communicate()[0] + outp = proc.communicate(input_s)[0] proc.wait() outp = outp.strip() return outp @@ -223,9 +309,27 @@ def stdout2list(cmd, stderr_on = True): Run a cmd, fetch output, return it as a list of lines. stderr_on controls whether to show output which comes on stderr. ''' - s = get_stdout(add_sudo(cmd), stderr_on) + s = get_stdout(add_sudo(cmd), stderr_on = stderr_on) return s.split('\n') +def append_file(dest,src): + 'Append src to dest' + try: + dest_f = open(dest,"a") + except IOError,msg: + common_err("open %s: %s" % (dest, msg)) + return False + try: + f = open(src) + except IOError,msg: + common_err("open %s: %s" % (src, msg)) + dest_f.close() + return False + dest_f.write(''.join(f)) + f.close() + dest_f.close() + return True + def wait4dc(what = "", show_progress = True): ''' Wait for the DC to get into the S_IDLE state. This should be @@ -283,6 +387,38 @@ def wait4dc(what = "", show_progress = T if cnt % 5 == 0: sys.stderr.write(".") +def run_ptest(graph_s, nograph, scores, utilization, actions, verbosity): + ''' + Pipe graph_s thru ptest(8). Show graph using dotty if requested. + ''' + actions_filter = "grep LogActions: | grep -vw Leave" + ptest = "ptest -X" + if verbosity: + if actions: + verbosity = 'v' * max(3,len(verbosity)) + ptest = "%s -%s" % (ptest,verbosity.upper()) + if scores: + ptest = "%s -s" % ptest + if utilization: + ptest = "%s -U" % ptest + if user_prefs.dotty and not nograph: + fd,dotfile = mkstemp() + ptest = "%s -D %s" % (ptest,dotfile) + else: + dotfile = None + # ptest prints to stderr + if actions: + ptest = "%s 2>&1 | %s" % (ptest, actions_filter) + print get_stdout(ptest, input_s = graph_s) + #page_string(get_stdout(ptest, input_s = graph_s)) + if dotfile: + show_dot_graph(dotfile) + vars.tmpfiles.append(dotfile) + else: + if not nograph: + common_info("install graphviz to see a transition graph") + return True + def is_id_valid(id): """ Verify that the id follows the definition: @@ -299,6 +435,57 @@ def check_filename(fname): fname_re = "^[^/]+$" return re.match(fname_re,id) +def check_range(a): + """ + Verify that the integer range in list a is valid. + """ + if len(a) != 2: + return False + if not isinstance(a[0],int) or not isinstance(a[1],int): + return False + return (int(a[0]) < int(a[1])) + +def sort_by_mtime(l): + 'Sort a (small) list of files by time mod.' + l2 = [(os.stat(x).st_mtime, x) for x in l] + l2.sort() + return [x[1] for x in l2] +def dirwalk(dir): + "walk a directory tree, using a generator" + # http://code.activestate.com/recipes/105873/ + for f in os.listdir(dir): + fullpath = os.path.join(dir,f) + if os.path.isdir(fullpath) and not os.path.islink(fullpath): + for x in dirwalk(fullpath): # recurse into subdir + yield x + else: + yield fullpath +def file_find_by_name(dir, fname): + 'Find a file within a tree matching fname.' + if not dir: + common_err("cannot dirwalk nothing!") + return None + if not fname: + common_err("file to find not provided") + return None + for f in dirwalk(dir): + if os.path.basename(f) == fname: + return f + return None + +def convert2ints(l): + """ + Convert a list of strings (or a string) to a list of ints. + All strings must be ints, otherwise conversion fails and None + is returned! + """ + try: + if isinstance(l,(tuple,list)): + return [int(x) for x in l] + else: # it's a string then + return int(l) + except: return None + def is_process(s): proc = subprocess.Popen("ps -e -o pid,command | grep -qs '%s'" % s, \ shell=True, stdout=subprocess.PIPE) diff -r 9609937061d7 -r b694b75d2e33 shell/modules/vars.py.in --- a/shell/modules/vars.py.in Fri Jul 15 10:51:00 2011 +1000 +++ b/shell/modules/vars.py.in Mon Jul 18 12:35:57 2011 +0200 @@ -186,6 +186,7 @@ class Vars(Singleton): hist_file = os.path.join(homedir,".crm_history") rc_file = os.path.join(homedir,".crm.rc") tmpl_conf_dir = os.path.join(homedir,".crmconf") + report_cache = os.path.join("@CRM_CACHE_DIR@","history") tmpl_dir = "@datadir@/@PACKAGE@/templates" pe_dir = "@PE_STATE_DIR@" crm_conf_dir = "@CRM_CONFIG_DIR@"