Commit 44d97824 authored by Christian Jürges's avatar Christian Jürges
Browse files

updated occron to multiple profiles

parent f17d170b
......@@ -2,9 +2,11 @@
__author__ = 'chris'
import configparser
import json
import logging
import logging.handlers
import os
import re
import socket
import subprocess
......@@ -25,70 +27,112 @@ else:
class Worker(threading.Thread):
def __init__(self, thread_id, line, config):
send_mail_regex = re.compile('activity:send-e?mails')
def __init__(self, thread_id, instance, config, code_path_list):
threading.Thread.__init__(self)
self.threadID = thread_id
self.counter = thread_id
self.base_path = config.get('occron', 'basedir')
self.cron_php = config.get('occron', 'cron.php')
self.php = config.get('occron', 'php')
self.sendmail = config.get('occron', 'sendmail')
self.oc_system_cron = config.get('occron', 'ocsystemcron')
# set defaults for sendmail and cron
self.sendmail = None
# set default cron to nextcloud cron
self.cron = config.get('occron', 'cron.php')
# set max allowed execution time
self.max_execution_time = int(config.get('occron', 'max_execution_time'))
self.instance = instance
self.code_path_list = code_path_list
self.occ_path = os.path.dirname(self.base_path) + '/' + line + '/' + self.cron_php
self.sendmail_path = os.path.dirname(self.base_path) + '/' + line + '/' + self.sendmail
self.oc_system_cron_path = os.path.dirname(self.base_path) + '/' + line + '/' + self.oc_system_cron
def get_occ_commands(self, path):
p, out = self.subproc(cmd=" ".join((self.php, os.path.join(path, 'occ list --format json'))))
def run(self):
logging.debug("%s is started" % self.threadID)
if os.path.exists(self.occ_path):
# sendmail
os.chdir(os.path.dirname(self.occ_path))
cmd = shlex.split('nice -n 19 %s %s ' % (self.php, self.sendmail_path))
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
p.communicate()
logging.info(
'%s: occ call [%s %s] returned %d' % (self.counter, self.php, self.sendmail_path, p.returncode))
# nc cron
# cmd = ['nice', '-n', '19', self.php, self.occ_path]
cmd = shlex.split('nice -n 19 %s %s ' % (self.php, self.occ_path))
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
try:
p.communicate(timeout=self.max_execution_time) # kill after defined seconds
logging.info(
'%s: occ cron call [%s %s] has return code: %d' % (
self.counter,
self.php,
self.occ_path,
p.returncode))
except subprocess.TimeoutExpired as err:
p.kill()
p.communicate()
logging.error(
'%s: occ cron call [%s %s] timed out after %d seconds. Return code: %d' % (
self.counter,
self.php,
self.occ_path,
self.max_execution_time,
p.returncode))
logging.info(
'%s: occ cron call [%s %s] returned %d' % (self.counter, self.php, self.occ_path, p.returncode))
# oc new cron
cmd = shlex.split('nice -n 19 %s %s' % (self.php, self.oc_system_cron_path))
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
if p.returncode != 0:
return False
try:
commands = json.loads(out)
for command in commands['commands']:
if 'name' in command:
if self.send_mail_regex.match(command['name']):
self.sendmail = 'occ ' + command['name']
continue
# check if this is owncloud using system:cron
if command['name'] == 'system:cron':
self.cron = 'occ ' + command['name'] + ' -q'
except ValueError as e:
logging.error(e)
return False
return True
def call_cron(self, path):
cmd = " ".join((self.php, os.path.join(path, self.cron)))
p, _ = self.subproc(cmd=cmd)
logging.info('%s: %s occ call [%s] returned %d' % (self.counter, self.instance, cmd, p.returncode))
def call_sendmail(self, path):
cmd = " ".join((self.php, os.path.join(path, self.sendmail)))
p, _ = self.subproc(cmd=cmd)
logging.info('%s: %s occ call [%s] returned %d' % (self.counter, self.instance, cmd, p.returncode))
def subproc(self, cmd):
out = None
cmd_list = shlex.split('nice -n 19 %s ' % cmd)
p = subprocess.Popen(cmd_list, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
try:
out, err = p.communicate(timeout=self.max_execution_time) # kill after defined seconds
except subprocess.TimeoutExpired:
p.kill()
p.communicate()
logging.info(
'%s: occ call [%s %s] returned %d' % (self.counter, self.php, self.oc_system_cron_path, p.returncode))
logging.error(
'%s: %s occ cron call [%s] timed out after %d seconds. Return code: %d' % (
self.counter,
self.instance,
cmd,
self.max_execution_time,
p.returncode))
else:
logging.info('%s: OC Instance occ cli %s doest not exist!' % (self.counter, self.occ_path))
return p, out
def run(self):
logging.debug("thread %s started" % self.threadID)
instance_exists = False
for cp in self.code_path_list:
instance_code_root_path = os.path.join(cp, self.instance)
if os.path.exists(instance_code_root_path):
instance_exists = True
# only call if instance is enabled and not broken
if self.get_occ_commands(path=instance_code_root_path):
self.call_cron(path=instance_code_root_path)
# only call if sendmail was detected
if self.sendmail:
self.call_sendmail(path=instance_code_root_path)
else:
logging.error('%s: no sendmail command found for %s' % (self.counter, instance_code_root_path))
else:
logging.error('%s: instance %s could not be called' % (self.counter, instance_code_root_path))
if not instance_exists:
logging.info('%s: instance %s could not be found in any profile!' % (self.counter, self.instance))
logging.info("%s: %s is done." % (self.threadID, self.instance))
logging.debug("%s is done." % self.threadID)
def get_code_profile_path(config):
match_key = 'profile_'
path_list = []
for k, v in config.items():
if k.startswith(match_key):
path_list.append(os.path.join(v.get('base'), v.get('code')))
return path_list
def main():
......@@ -101,6 +145,13 @@ def main():
)
config.read(my_path + '/occron.conf')
oc_config = configparser.ConfigParser()
oc_config.read(os.path.join(my_path, "..", 'tools', 'oc_py.conf'))
code_path_list = get_code_profile_path(oc_config)
# config.set('cp', 'list', code_path_list)
# max. count of subprocesses
max_children = int(config.get('occron', 'max_children'))
......@@ -130,7 +181,7 @@ def main():
exit(-1)
# try to acquire lock to redis
rl = redis_lock.Lock(r, '/occron.lock', expire=10, auto_renewal=True)
rl = redis_lock.Lock(r, '/occron.temp.lock', expire=10, auto_renewal=True)
if rl.acquire(blocking=False):
# read list of oc instances...
......@@ -143,41 +194,41 @@ def main():
print("Missing occron.list")
exit(-1)
for line in lines:
for instance in lines:
# time.sleep(10)
line = line.strip()
instance = instance.strip()
# print(line)
if len(line) > 0:
if line[0] != '#':
if len(instance) > 0:
if instance[0] != '#':
# find unused thread
thread_started = False
free_slot = None
while not thread_started:
for i in range(0, max_children):
if th[i] is None:
logging.debug("%s: using free slot for %s " % (i, line))
logging.debug("%s: using free slot for %s " % (i, instance))
free_slot = i
break
if not th[i].is_alive():
logging.debug("%s: reusing free slot for %s " % (i, line))
logging.debug("%s: reusing free slot for %s " % (i, instance))
free_slot = i
break
if free_slot is not None:
# i is free --> start a thread
logging.debug("%s: starting a thread on slot for %s " % (free_slot, line))
th[free_slot] = Worker(free_slot, line, config)
logging.debug("%s: starting a thread on slot for %s " % (free_slot, instance))
th[free_slot] = Worker(free_slot, instance, config, code_path_list)
th[free_slot].start()
thread_started = True
time.sleep(1)
time.sleep(0.5)
else:
# wait 5 seconds
logging.debug("all slots used - waiting 1 second...")
time.sleep(1)
time.sleep(0.5)
else:
logging.info('ignoring entry [%s] ' % (line))
logging.info('ignoring entry [%s] ' % instance)
rl.release()
else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment