From 26883b12d275df1bf4dd27d1ddb729b54e157db1 Mon Sep 17 00:00:00 2001 From: Jimmy Berry Date: Thu, 9 Nov 2017 01:29:57 -0600 Subject: [PATCH] metrics: rework to store points as named tuple and write in batches. Savings of around 400MB per 10,000 requests. Using a named tuple was the original approach for this reason, but the influxdb interface requires dict()s and it seemed silly to spend time converting them. Additionally, influxdb client already does batching. Unfortunately, with the amount of data processed for Factory that will continue to grow this approach is necessary. The dict() final structures are buffered up to ~1000 before being written and released. Another benefit of the batching is that influxdb does not allocate memory for the entire incoming batch. --- metrics.py | 76 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/metrics.py b/metrics.py index de8fa7be..3ee281de 100755 --- a/metrics.py +++ b/metrics.py @@ -19,6 +19,7 @@ from osclib.conf import Config from osclib.stagingapi import StagingAPI SOURCE_DIR = os.path.dirname(os.path.realpath(__file__)) +Point = namedtuple('Point', ['measurement', 'tags', 'fields', 'time', 'delta']) # Duplicate Leap config to handle 13.2 without issue. osclib.conf.DEFAULT[ @@ -80,13 +81,7 @@ points = [] def point(measurement, fields, datetime, tags={}, delta=False): global points - points.append({ - 'measurement': measurement, - 'tags': tags, - 'fields': fields, - 'time': timestamp(datetime), - 'delta': delta, - }) + points.append(Point(measurement, tags, fields, timestamp(datetime), delta)) def timestamp(datetime): return int(datetime.strftime('%s')) @@ -237,9 +232,8 @@ def ingest_requests(api, project): else: print('unable to find priority history entry for {} to {}'.format(request.get('id'), priority.text)) - walk_points(points, project) - - return points + print('finalizing {:,} points'.format(len(points))) + return walk_points(points, project) def who_workaround(request, review, relax=False): # Super ugly workaround for incorrect and missing data: @@ -266,37 +260,59 @@ def who_workaround(request, review, relax=False): return who +# Walk data points in order by time, adding up deltas and merging points at +# the same time. Data is converted to dict() and written to influx batches to +# avoid extra memory usage required for all data in dict() and avoid influxdb +# allocating memory for entire incoming data set at once. def walk_points(points, target): + global client + # Wait until just before writing to drop database. + client.drop_database(client._database) + client.create_database(client._database) + counters = {} final = [] - for point in sorted(points, key=lambda l: l['time']): - if not point['delta']: - final.append(point) + time_last = None + wrote = 0 + for point in sorted(points, key=lambda l: l.time): + if point.time != time_last and len(final) >= 1000: + # Write final point in batches of ~1000, but guard against writing + # when in the middle of points at the same time as they may end up + # being merged. As such the previous time should not match current. + client.write_points(final, 's') + wrote += len(final) + final = [] + time_last = point.time + + if not point.delta: + final.append(dict(point._asdict())) continue # A more generic method like 'key' which ended up being needed is likely better. - measurement = counters_tag_key = point['measurement'] + measurement = counters_tag_key = point.measurement if measurement == 'staging': - counters_tag_key += point['tags']['id'] + counters_tag_key += point.tags['id'] elif measurement == 'review_count': - counters_tag_key += '_'.join(point['tags']['key']) + counters_tag_key += '_'.join(point.tags['key']) elif measurement == 'priority': - counters_tag_key += point['tags']['level'] + counters_tag_key += point.tags['level'] counters_tag = counters.setdefault(counters_tag_key, {'last': None, 'values': {}}) values = counters_tag['values'] - for key, value in point['fields'].items(): + for key, value in point.fields.items(): values[key] = values.setdefault(key, 0) + value - if counters_tag['last'] and point['time'] == counters_tag['last']['time']: + if counters_tag['last'] and point.time == counters_tag['last']['time']: point = counters_tag['last'] else: + point = dict(point._asdict()) counters_tag['last'] = point final.append(point) point['fields'].update(counters_tag['values']) - # In order to allow for merging delta entries for the same time. - points = final + # Write any remaining final points. + client.write_points(final, 's') + return wrote + len(final) def ingest_release_schedule(project): points = [] @@ -321,9 +337,13 @@ def ingest_release_schedule(project): 'time': timestamp(date), }) - return points + client.write_points(points, 's') + return len(points) def main(args): + global client + client = InfluxDBClient(args.host, args.port, args.user, args.password, args.project) + osc.conf.get_config(override_apiurl=args.apiurl) osc.conf.config['debug'] = args.debug @@ -344,16 +364,8 @@ def main(args): print('who_workaround_swap', who_workaround_swap) print('who_workaround_miss', who_workaround_miss) - print('writing {:,} points and {:,} annotation points to db'.format( - len(points_requests), len(points_schedule))) - - db = args.project - client = InfluxDBClient(args.host, args.port, args.user, args.password, db) - client.drop_database(db) - client.create_database(db) - - client.write_points(points_requests, 's') - client.write_points(points_schedule, 's') + print('wrote {:,} points and {:,} annotation points to db'.format( + points_requests, points_schedule)) if __name__ == '__main__':