2017-10-04 00:58:43 -05:00
#!/usr/bin/python
import argparse
from collections import namedtuple
from datetime import datetime
from dateutil . parser import parse as date_parse
from influxdb import InfluxDBClient
from lxml import etree as ET
import os
import subprocess
import sys
import yaml
2018-03-08 21:54:29 -06:00
import metrics_release
2017-10-04 00:58:43 -05:00
import osc . conf
import osc . core
import osclib . conf
from osclib . cache import Cache
from osclib . conf import Config
from osclib . stagingapi import StagingAPI
2017-11-08 16:42:42 -06:00
SOURCE_DIR = os . path . dirname ( os . path . realpath ( __file__ ) )
2017-11-09 01:29:57 -06:00
Point = namedtuple ( ' Point ' , [ ' measurement ' , ' tags ' , ' fields ' , ' time ' , ' delta ' ] )
2017-11-08 16:42:42 -06:00
2017-10-04 00:58:43 -05:00
# Duplicate Leap config to handle 13.2 without issue.
2017-10-20 08:54:37 +02:00
osclib . conf . DEFAULT [
r ' openSUSE:(?P<project>[ \ d.]+) ' ] = osclib . conf . DEFAULT [
2018-02-19 14:39:44 -06:00
r ' openSUSE:(?P<project>Leap:(?P<version>[ \ d.]+)) ' ]
2017-10-04 00:58:43 -05:00
2017-11-09 01:23:03 -06:00
# Provide osc.core.get_request_list() that swaps out search() implementation to
# capture the generated query, paginate over and yield each request to avoid
# loading all requests at the same time. Additionally, use lxml ET to avoid
# having to re-parse to perform complex xpaths.
2017-10-04 00:58:43 -05:00
def get_request_list ( * args , * * kwargs ) :
osc . core . _search = osc . core . search
2017-11-09 01:23:03 -06:00
osc . core . search = search_capture
2017-10-04 00:58:43 -05:00
osc . core . _ET = osc . core . ET
osc . core . ET = ET
osc . core . get_request_list ( * args , * * kwargs )
osc . core . search = osc . core . _search
2017-11-09 01:23:03 -06:00
query = search_capture . query
for request in search_paginated_generator ( query [ 0 ] , query [ 1 ] , * * query [ 2 ] ) :
# Python 3 yield from.
yield request
2017-10-04 00:58:43 -05:00
osc . core . ET = osc . core . _ET
2017-11-09 01:23:03 -06:00
def search_capture ( apiurl , queries = None , * * kwargs ) :
search_capture . query = ( apiurl , queries , kwargs )
return { ' request ' : ET . fromstring ( ' <collection matches= " 0 " ></collection> ' ) }
2017-10-04 00:58:43 -05:00
# Provides a osc.core.search() implementation for use with get_request_list()
2017-11-09 01:23:03 -06:00
# that paginates in sets of 1000 and yields each request.
def search_paginated_generator ( apiurl , queries = None , * * kwargs ) :
2017-10-04 00:58:43 -05:00
if " submit/target/@project= ' openSUSE:Factory ' " in kwargs [ ' request ' ] :
kwargs [ ' request ' ] = osc . core . xpath_join ( kwargs [ ' request ' ] , ' @id>250000 ' , op = ' and ' )
2017-11-09 01:23:03 -06:00
request_count = 0
2017-10-04 00:58:43 -05:00
queries [ ' request ' ] [ ' limit ' ] = 1000
queries [ ' request ' ] [ ' offset ' ] = 0
while True :
2017-11-09 01:23:03 -06:00
collection = osc . core . search ( apiurl , queries , * * kwargs ) [ ' request ' ]
if not request_count :
print ( ' processing {:,} requests ' . format ( int ( collection . get ( ' matches ' ) ) ) )
for request in collection . findall ( ' request ' ) :
yield request
request_count + = 1
2017-10-04 00:58:43 -05:00
2017-11-09 01:23:03 -06:00
if request_count == int ( collection . get ( ' matches ' ) ) :
2017-10-04 00:58:43 -05:00
# Stop paging once the expected number of items has been returned.
break
2017-11-09 01:17:27 -06:00
# Release memory as otherwise ET seems to hold onto it.
collection . clear ( )
2017-10-04 00:58:43 -05:00
queries [ ' request ' ] [ ' offset ' ] + = queries [ ' request ' ] [ ' limit ' ]
points = [ ]
def point ( measurement , fields , datetime , tags = { } , delta = False ) :
global points
2017-11-09 01:29:57 -06:00
points . append ( Point ( measurement , tags , fields , timestamp ( datetime ) , delta ) )
2017-10-04 00:58:43 -05:00
def timestamp ( datetime ) :
return int ( datetime . strftime ( ' %s ' ) )
def ingest_requests ( api , project ) :
requests = get_request_list ( api . apiurl , project ,
req_state = ( ' accepted ' , ' revoked ' , ' superseded ' ) ,
exclude_target_projects = [ project ] ,
withfullhistory = True )
for request in requests :
if request . find ( ' action ' ) . get ( ' type ' ) not in ( ' submit ' , ' delete ' ) :
# TODO Handle non-stageable requests via different flow.
continue
created_at = date_parse ( request . find ( ' history ' ) . get ( ' when ' ) )
final_at = date_parse ( request . find ( ' state ' ) . get ( ' when ' ) )
final_at_history = date_parse ( request . find ( ' history[last()] ' ) . get ( ' when ' ) )
if final_at_history > final_at :
# Workaround for invalid dates: openSUSE/open-build-service#3858.
final_at = final_at_history
# TODO Track requests in psuedo-ignore state.
point ( ' total ' , { ' backlog ' : 1 , ' open ' : 1 } , created_at , { ' event ' : ' create ' } , True )
point ( ' total ' , { ' backlog ' : - 1 , ' open ' : - 1 } , final_at , { ' event ' : ' close ' } , True )
request_tags = { }
request_fields = {
' total ' : ( final_at - created_at ) . total_seconds ( ) ,
' staged_count ' : len ( request . findall ( ' review[@by_group= " factory-staging " ]/history ' ) ) ,
}
# TODO Total time spent in backlog (ie factory-staging, but excluding when staged).
staged_first_review = request . xpath ( ' review[contains(@by_project, " {} :Staging: " )] ' . format ( project ) )
if len ( staged_first_review ) :
by_project = staged_first_review [ 0 ] . get ( ' by_project ' )
request_tags [ ' type ' ] = ' adi ' if api . is_adi_project ( by_project ) else ' letter '
# TODO Determine current whitelists state based on dashboard revisions.
if project . startswith ( ' openSUSE:Factory ' ) :
splitter_whitelist = ' B C D E F G H I J ' . split ( )
if splitter_whitelist :
short = api . extract_staging_short ( by_project )
request_tags [ ' whitelisted ' ] = short in splitter_whitelist
else :
# All letter where whitelisted since no restriction.
request_tags [ ' whitelisted ' ] = request_tags [ ' type ' ] == ' letter '
ready_to_accept = request . xpath ( ' review[contains(@by_project, " {} :Staging:adi: " ) and @state= " accepted " ]/history[comment[text() = " ready to accept " ]]/@when ' . format ( project ) )
if len ( ready_to_accept ) :
ready_to_accept = date_parse ( ready_to_accept [ 0 ] )
request_fields [ ' ready ' ] = ( final_at - ready_to_accept ) . total_seconds ( )
# TODO Points with indentical timestamps are merged so this can be placed in total
# measurement, but may make sense to keep this separate and make the others follow.
point ( ' ready ' , { ' count ' : 1 } , ready_to_accept , delta = True )
point ( ' ready ' , { ' count ' : - 1 } , final_at , delta = True )
staged_first = request . xpath ( ' review[@by_group= " factory-staging " ]/history/@when ' )
if len ( staged_first ) :
staged_first = date_parse ( staged_first [ 0 ] )
request_fields [ ' staged_first ' ] = ( staged_first - created_at ) . total_seconds ( )
# TODO Decide if better to break out all measurements by time most relevant to event,
# time request was created, or time request was finalized. It may also make sense to
# keep separate measurement by different times like this one.
point ( ' request_staged_first ' , { ' value ' : request_fields [ ' staged_first ' ] } , staged_first , request_tags )
point ( ' request ' , request_fields , final_at , request_tags )
# Staging related reviews.
for number , review in enumerate (
request . xpath ( ' review[contains(@by_project, " {} :Staging: " )] ' . format ( project ) ) , start = 1 ) :
staged_at = date_parse ( review . get ( ' when ' ) )
project_type = ' adi ' if api . is_adi_project ( review . get ( ' by_project ' ) ) else ' letter '
short = api . extract_staging_short ( review . get ( ' by_project ' ) )
point ( ' staging ' , { ' count ' : 1 } , staged_at ,
{ ' id ' : short , ' type ' : project_type , ' event ' : ' select ' } , True )
point ( ' total ' , { ' backlog ' : - 1 , ' staged ' : 1 } , staged_at , { ' event ' : ' select ' } , True )
who = who_workaround ( request , review )
review_tags = { ' event ' : ' select ' , ' user ' : who , ' number ' : number }
review_tags . update ( request_tags )
point ( ' user ' , { ' count ' : 1 } , staged_at , review_tags )
history = review . find ( ' history ' )
if history is not None :
unselected_at = date_parse ( history . get ( ' when ' ) )
else :
unselected_at = final_at
# If a request is declined and re-opened it must be repaired before being re-staged. At
# which point the only possible open review should be the final one.
point ( ' staging ' , { ' count ' : - 1 } , unselected_at ,
{ ' id ' : short , ' type ' : project_type , ' event ' : ' unselect ' } , True )
point ( ' total ' , { ' backlog ' : 1 , ' staged ' : - 1 } , unselected_at , { ' event ' : ' unselect ' } , True )
# No-staging related reviews.
for review in request . xpath ( ' review[not(contains(@by_project, " {} :Staging: " ))] ' . format ( project ) ) :
tags = {
# who_added is non-trivial due to openSUSE/open-build-service#3898.
' state ' : review . get ( ' state ' ) ,
}
opened_at = date_parse ( review . get ( ' when ' ) )
history = review . find ( ' history ' )
if history is not None :
completed_at = date_parse ( history . get ( ' when ' ) )
tags [ ' who_completed ' ] = history . get ( ' who ' )
else :
completed_at = final_at
# Does not seem to make sense to mirror user responsible for making final state
# change as the user who completed the review.
tags [ ' key ' ] = [ ]
tags [ ' type ' ] = [ ]
for name , value in sorted ( review . items ( ) , reverse = True ) :
if name . startswith ( ' by_ ' ) :
tags [ name ] = value
tags [ ' key ' ] . append ( value )
tags [ ' type ' ] . append ( name [ 3 : ] )
tags [ ' type ' ] = ' _ ' . join ( tags [ ' type ' ] )
point ( ' review ' , { ' open_for ' : ( completed_at - opened_at ) . total_seconds ( ) } , completed_at , tags )
point ( ' review_count ' , { ' count ' : 1 } , opened_at , tags , True )
point ( ' review_count ' , { ' count ' : - 1 } , completed_at , tags , True )
found = [ ]
for set_priority in request . xpath ( ' history[description[contains(text(), " Request got a new priority: " )]] ' ) :
parts = set_priority . find ( ' description ' ) . text . rsplit ( ' ' , 3 )
priority_previous = parts [ 1 ]
priority = parts [ 3 ]
if priority == priority_previous :
continue
changed_at = date_parse ( set_priority . get ( ' when ' ) )
if priority_previous != ' moderate ' :
point ( ' priority ' , { ' count ' : - 1 } , changed_at , { ' level ' : priority_previous } , True )
if priority != ' moderate ' :
point ( ' priority ' , { ' count ' : 1 } , changed_at , { ' level ' : priority } , True )
found . append ( priority )
# Ensure a final removal entry is created when request is finalized.
priority = request . find ( ' priority ' )
if priority is not None and priority . text != ' moderate ' :
if priority . text in found :
point ( ' priority ' , { ' count ' : - 1 } , final_at , { ' level ' : priority . text } , True )
else :
print ( ' unable to find priority history entry for {} to {} ' . format ( request . get ( ' id ' ) , priority . text ) )
2017-11-09 01:29:57 -06:00
print ( ' finalizing {:,} points ' . format ( len ( points ) ) )
return walk_points ( points , project )
2017-10-04 00:58:43 -05:00
def who_workaround ( request , review , relax = False ) :
# Super ugly workaround for incorrect and missing data:
# - openSUSE/open-build-service#3857
# - openSUSE/open-build-service#3898
global who_workaround_swap , who_workaround_miss
who = review . get ( ' who ' ) # All that should be required (used as fallback).
when = review . get ( ' when ' )
if relax :
# Super hack, chop off seconds to relax in hopes of finding potential.
when = when [ : - 2 ]
who_real = request . xpath (
' history[contains(@when, " {} " ) and comment[contains(text(), " {} " )]]/@who ' . format (
when , review . get ( ' by_project ' ) ) )
if len ( who_real ) :
who = who_real [ 0 ]
who_workaround_swap + = 1
elif not relax :
return who_workaround ( request , review , True )
else :
who_workaround_miss + = 1
return who
2017-11-09 01:29:57 -06:00
# Walk data points in order by time, adding up deltas and merging points at
# the same time. Data is converted to dict() and written to influx batches to
# avoid extra memory usage required for all data in dict() and avoid influxdb
# allocating memory for entire incoming data set at once.
2017-10-04 00:58:43 -05:00
def walk_points ( points , target ) :
2017-11-09 01:29:57 -06:00
global client
2018-03-09 16:53:26 -06:00
measurements = set ( )
2017-10-04 00:58:43 -05:00
counters = { }
final = [ ]
2017-11-09 01:29:57 -06:00
time_last = None
wrote = 0
for point in sorted ( points , key = lambda l : l . time ) :
2018-03-09 16:53:26 -06:00
if point . measurement not in measurements :
# Wait until just before writing to drop measurement.
client . drop_measurement ( point . measurement )
measurements . add ( point . measurement )
2017-11-09 01:29:57 -06:00
if point . time != time_last and len ( final ) > = 1000 :
# Write final point in batches of ~1000, but guard against writing
# when in the middle of points at the same time as they may end up
# being merged. As such the previous time should not match current.
client . write_points ( final , ' s ' )
wrote + = len ( final )
final = [ ]
time_last = point . time
if not point . delta :
final . append ( dict ( point . _asdict ( ) ) )
2017-10-04 00:58:43 -05:00
continue
# A more generic method like 'key' which ended up being needed is likely better.
2017-11-09 01:29:57 -06:00
measurement = counters_tag_key = point . measurement
2017-10-04 00:58:43 -05:00
if measurement == ' staging ' :
2017-11-09 01:29:57 -06:00
counters_tag_key + = point . tags [ ' id ' ]
2017-10-04 00:58:43 -05:00
elif measurement == ' review_count ' :
2017-11-09 01:29:57 -06:00
counters_tag_key + = ' _ ' . join ( point . tags [ ' key ' ] )
2017-10-04 00:58:43 -05:00
elif measurement == ' priority ' :
2017-11-09 01:29:57 -06:00
counters_tag_key + = point . tags [ ' level ' ]
2017-10-04 00:58:43 -05:00
counters_tag = counters . setdefault ( counters_tag_key , { ' last ' : None , ' values ' : { } } )
values = counters_tag [ ' values ' ]
2017-11-09 01:29:57 -06:00
for key , value in point . fields . items ( ) :
2017-10-04 00:58:43 -05:00
values [ key ] = values . setdefault ( key , 0 ) + value
2017-11-09 01:29:57 -06:00
if counters_tag [ ' last ' ] and point . time == counters_tag [ ' last ' ] [ ' time ' ] :
2017-10-04 00:58:43 -05:00
point = counters_tag [ ' last ' ]
else :
2017-11-09 01:29:57 -06:00
point = dict ( point . _asdict ( ) )
2017-10-04 00:58:43 -05:00
counters_tag [ ' last ' ] = point
final . append ( point )
point [ ' fields ' ] . update ( counters_tag [ ' values ' ] )
2017-11-09 01:29:57 -06:00
# Write any remaining final points.
client . write_points ( final , ' s ' )
return wrote + len ( final )
2017-10-04 00:58:43 -05:00
def ingest_release_schedule ( project ) :
points = [ ]
release_schedule = { }
2017-11-08 16:42:42 -06:00
release_schedule_file = os . path . join ( SOURCE_DIR , ' metrics/annotation/ {} .yaml ' . format ( project ) )
2017-10-04 00:58:43 -05:00
if project . endswith ( ' Factory ' ) :
2018-02-19 14:40:17 -06:00
# TODO Pending resolution to #1250 regarding deployment.
return 0
2017-10-04 00:58:43 -05:00
# Extract Factory "release schedule" from Tumbleweed snapshot list.
command = ' rsync rsync.opensuse.org::opensuse-full/opensuse/tumbleweed/iso/Changes.* | ' \
' grep -oP " Changes \ . \ K \ d { 5,} " '
snapshots = subprocess . Popen ( command , shell = True , stdout = subprocess . PIPE ) . communicate ( ) [ 0 ]
for date in snapshots . split ( ) :
release_schedule [ datetime . strptime ( date , ' % Y % m %d ' ) ] = ' Snapshot {} ' . format ( date )
elif os . path . isfile ( release_schedule_file ) :
# Load release schedule for non-rolling releases from yaml file.
with open ( release_schedule_file , ' r ' ) as stream :
release_schedule = yaml . safe_load ( stream )
for date , description in release_schedule . items ( ) :
points . append ( {
' measurement ' : ' release_schedule ' ,
' fields ' : { ' description ' : description } ,
' time ' : timestamp ( date ) ,
} )
2017-11-09 01:29:57 -06:00
client . write_points ( points , ' s ' )
return len ( points )
2017-10-04 00:58:43 -05:00
def main ( args ) :
2017-11-09 01:29:57 -06:00
global client
client = InfluxDBClient ( args . host , args . port , args . user , args . password , args . project )
2017-10-04 00:58:43 -05:00
osc . conf . get_config ( override_apiurl = args . apiurl )
osc . conf . config [ ' debug ' ] = args . debug
2018-03-09 16:53:26 -06:00
# Ensure database exists.
client . create_database ( client . _database )
2018-03-08 21:54:29 -06:00
metrics_release . ingest ( client )
if args . release_only :
return
2017-10-04 00:58:43 -05:00
# Use separate cache since it is persistent.
Cache . CACHE_DIR = os . path . expanduser ( ' ~/.cache/osc-plugin-factory-metrics ' )
2018-01-10 20:11:46 -06:00
if args . wipe_cache :
Cache . delete_all ( )
2017-10-04 00:58:43 -05:00
Cache . PATTERNS [ ' /search/request ' ] = sys . maxint
Cache . init ( )
Config ( args . project )
api = StagingAPI ( osc . conf . config [ ' apiurl ' ] , args . project )
global who_workaround_swap , who_workaround_miss
who_workaround_swap = who_workaround_miss = 0
points_requests = ingest_requests ( api , args . project )
points_schedule = ingest_release_schedule ( args . project )
print ( ' who_workaround_swap ' , who_workaround_swap )
print ( ' who_workaround_miss ' , who_workaround_miss )
2017-11-09 01:29:57 -06:00
print ( ' wrote {:,} points and {:,} annotation points to db ' . format (
points_requests , points_schedule ) )
2017-10-04 00:58:43 -05:00
if __name__ == ' __main__ ' :
description = ' Ingest relevant OBS and annotation data to generate insightful metrics. '
parser = argparse . ArgumentParser ( description = description )
parser . add_argument ( ' -A ' , ' --apiurl ' , help = ' OBS instance API URL ' )
parser . add_argument ( ' -d ' , ' --debug ' , action = ' store_true ' , help = ' print useful debugging info ' )
parser . add_argument ( ' -p ' , ' --project ' , default = ' openSUSE:Factory ' , help = ' OBS project ' )
parser . add_argument ( ' --host ' , default = ' localhost ' , help = ' InfluxDB host ' )
parser . add_argument ( ' --port ' , default = 8086 , help = ' InfluxDB post ' )
parser . add_argument ( ' --user ' , default = ' root ' , help = ' InfluxDB user ' )
parser . add_argument ( ' --password ' , default = ' root ' , help = ' InfluxDB password ' )
2018-01-10 20:11:46 -06:00
parser . add_argument ( ' --wipe-cache ' , action = ' store_true ' , help = ' wipe GET request cache before executing ' )
2018-03-08 21:54:29 -06:00
parser . add_argument ( ' --release-only ' , action = ' store_true ' , help = ' ingest release metrics only ' )
2017-10-04 00:58:43 -05:00
args = parser . parse_args ( )
sys . exit ( main ( args ) )