diff --git a/salt/utils/schedule.py b/salt/utils/schedule.py index 4458202..cae5fcf 100644 --- a/salt/utils/schedule.py +++ b/salt/utils/schedule.py @@ -482,24 +482,24 @@ class Schedule(object): func = None if func not in self.functions: log.info( - 'Invalid function: {0} in job {1}. Ignoring.'.format( + 'Invalid function: {0} in scheduled job {1}.'.format( func, name ) ) + + if 'name' not in data: + data['name'] = name + log.info( + 'Running Job: {0}.'.format(name) + ) + if self.opts.get('multiprocessing', True): + thread_cls = multiprocessing.Process else: - if 'name' not in data: - data['name'] = name - log.info( - 'Running Job: {0}.'.format(name) - ) - if self.opts.get('multiprocessing', True): - thread_cls = multiprocessing.Process - else: - thread_cls = threading.Thread - proc = thread_cls(target=self.handle_func, args=(func, data)) - proc.start() - if self.opts.get('multiprocessing', True): - proc.join() + thread_cls = threading.Thread + proc = thread_cls(target=self.handle_func, args=(func, data)) + proc.start() + if self.opts.get('multiprocessing', True): + proc.join() def enable_schedule(self): ''' @@ -642,33 +642,39 @@ class Schedule(object): except OSError: log.info('Unable to remove file: {0}.'.format(fn_)) - salt.utils.daemonize_if(self.opts) + try: + salt.utils.daemonize_if(self.opts) - ret['pid'] = os.getpid() + ret['pid'] = os.getpid() - if 'jid_include' not in data or data['jid_include']: - log.debug('schedule.handle_func: adding this job to the jobcache ' - 'with data {0}'.format(ret)) - # write this to /var/cache/salt/minion/proc - with salt.utils.fopen(proc_fn, 'w+b') as fp_: - fp_.write(salt.payload.Serial(self.opts).dumps(ret)) - - args = tuple() - if 'args' in data: - args = data['args'] - - kwargs = {} - if 'kwargs' in data: - kwargs = data['kwargs'] - # if the func support **kwargs, lets pack in the pub data we have - # TODO: pack the *same* pub data as a minion? - argspec = salt.utils.args.get_function_argspec(self.functions[func]) - if argspec.keywords: - # this function accepts **kwargs, pack in the publish data - for key, val in six.iteritems(ret): - kwargs['__pub_{0}'.format(key)] = val + if 'jid_include' not in data or data['jid_include']: + log.debug('schedule.handle_func: adding this job to the jobcache ' + 'with data {0}'.format(ret)) + # write this to /var/cache/salt/minion/proc + with salt.utils.fopen(proc_fn, 'w+b') as fp_: + fp_.write(salt.payload.Serial(self.opts).dumps(ret)) + + args = tuple() + if 'args' in data: + args = data['args'] + + kwargs = {} + if 'kwargs' in data: + kwargs = data['kwargs'] + + if func not in self.functions: + ret['return'] = self.functions.missing_fun_string(func) + salt.utils.error.raise_error( + message=self.functions.missing_fun_string(func)) + + # if the func support **kwargs, lets pack in the pub data we have + # TODO: pack the *same* pub data as a minion? + argspec = salt.utils.args.get_function_argspec(self.functions[func]) + if argspec.keywords: + # this function accepts **kwargs, pack in the publish data + for key, val in six.iteritems(ret): + kwargs['__pub_{0}'.format(key)] = val - try: ret['return'] = self.functions[func](*args, **kwargs) data_returner = data.get('returner', None) @@ -694,28 +700,34 @@ class Schedule(object): ) ) - # Only attempt to return data to the master - # if the scheduled job is running on a minion. - if '__role' in self.opts and self.opts['__role'] == 'minion': - if 'return_job' in data and not data['return_job']: - pass - else: - # Send back to master so the job is included in the job list - mret = ret.copy() - mret['jid'] = 'req' - channel = salt.transport.Channel.factory(self.opts, usage='salt_schedule') - load = {'cmd': '_return', 'id': self.opts['id']} - for key, value in six.iteritems(mret): - load[key] = value - channel.send(load) - + ret['retcode'] = self.functions.pack['__context__']['retcode'] + ret['success'] = True except Exception: log.exception("Unhandled exception running {0}".format(ret['fun'])) # Although catch-all exception handlers are bad, the exception here # is to let the exception bubble up to the top of the thread context, # where the thread will die silently, which is worse. + if 'return' not in ret: + ret['return'] = "Unhandled exception running {0}".format(ret['fun']) + ret['success'] = False + ret['retcode'] = 254 finally: try: + # Only attempt to return data to the master + # if the scheduled job is running on a minion. + if '__role' in self.opts and self.opts['__role'] == 'minion': + if 'return_job' in data and not data['return_job']: + pass + else: + # Send back to master so the job is included in the job list + mret = ret.copy() + mret['jid'] = 'req' + channel = salt.transport.Channel.factory(self.opts, usage='salt_schedule') + load = {'cmd': '_return', 'id': self.opts['id']} + for key, value in six.iteritems(mret): + load[key] = value + channel.send(load) + log.debug('schedule.handle_func: Removing {0}'.format(proc_fn)) os.unlink(proc_fn) except OSError as exc: @@ -757,11 +769,10 @@ class Schedule(object): func = None if func not in self.functions: log.info( - 'Invalid function: {0} in job {1}. Ignoring.'.format( + 'Invalid function: {0} in scheduled job {1}.'.format( func, job ) ) - continue if 'name' not in data: data['name'] = job # Add up how many seconds between now and then