Skip to content
Snippets Groups Projects
wpsToolsBox.py 6.21 KiB
Newer Older
algodeveloper1@free.fr's avatar
algodeveloper1@free.fr committed
import xml.etree.ElementTree as ET
import requests
import sys
sys.path.insert(0, '/wps_natebook/owslib')
from owslib.wps import WebProcessingService
import time
import json


class wpsTB:
    
    def __init__(self, oauth_token, copa_backend_url, user_id):
    
        self.oauth_token = oauth_token
        self.copa_backend_url = copa_backend_url
        self.user_id = user_id
        
        self.wps = WebProcessingService(self.copa_backend_url + 'wps/capabilities', \
                           headers = {'Authorization': 'Bearer ' + self.oauth_token}, version ="2.0.0")
    
    def wps_info(self):
        '''
        Print all information about wps
        
        '''
        print("--- Service identification ---")
        print('Title: ',self.wps.identification.title)
        print('Type: ',self.wps.identification.type, self.wps.identification.version)
        print(" ")
        print("--- Service Provider ---")
        print('Provider: ',self.wps.provider.name, self.wps.provider.contact.email, self.wps.provider.url)
        print(" ")
        print("--- Service Operations ---")
        for operation in self.wps.operations:
            print(operation.name)
    
    def wps_processes(self):
        '''
        Print all process available
        
        '''
        print("--- Service Processes ---")
        print("There are "+str(len(self.wps.processes))+" processes available for execution")
        print(" ")
        print("======================================================")
        print("|Identifier                | Title                ")
        print("======================================================")
        for process in self.wps.processes:
            print("|{0}  | {1} ".format(process.identifier, process.title))
            print("------------------------------------------------------")
    
    
    def wps_process_info(self, process_name = None, process_id = None):
        '''
        Print all info about one proicess
        
        Input:
            process_name (str): name of the process
            processç_id (str): identifier of the process 
        '''
        for process in self.wps.processes:
            if process.title == process_name or process.identifier == process_id:
                print("Title       :", process.title)
                print("Description :", process.abstract)
                print("Identifier  :", process.identifier)

        
    def wps_execute_process(self, algo_wps_xml, nb_iter):
        '''
        Execute a algorithm
        
        Input:
            algo_wps_xml (str): path to the xml conf file 
            nb_iter (int) : number of iteration 
        '''
        
        n_iter = 0

        time1 = time.time()
        job = []
        finalJob = []

        ################read##############
    
        if type(algo_wps_xml) != list:
            while n_iter < nb_iter:
                jobId = self.wps_execute(algo_wps_xml)
                job.append(jobId)
                time.sleep(1)
                n_iter += 1
        else:
            while n_iter < len(algo_wps_xml):
                jobId = self.wps_execute(algo_wps_xml[n_iter])
                job.append(jobId)
                time.sleep(1)
                n_iter += 1

        while len(job) != 0:
            for Id in job:
                url = self.copa_backend_url + 'wps/getStatus?userId=' + self.user_id
                body = { "jobID": Id }
                response = requests.post(url, headers = {'Authorization': 'Bearer '+ self.oauth_token},  json = body)
                parsed = json.loads(response.text)
                if parsed["status"] == "SUCCEEDED" or parsed["status"] == "FAILED":
                    job.remove(Id)
                    print(parsed["status"])
                    finalJob.append(parsed["jobID"])


            time.sleep(10)
        time2 = time.time()
        final_time = (time2 - time1)
        print('Execution time was', final_time)
        
        return finalJob
    
        
    def wps_execute(self, algo_wps_xml):
        '''
        Execute a algorithm with wps
        
        Input:
            algo_wps_xml (str): path to the xml conf file 
        '''
    
        print("--- EXECUTE---")

        url = self.copa_backend_url + 'wps?userId=' + self.user_id
        tree = ET.parse(algo_wps_xml)

        response = requests.post(url, headers = \
                                 {'Authorization': 'Bearer '+ self.oauth_token, 'Content-Type': 'application/xml'},  \
                                 data=ET.tostring(tree.getroot()))
        
        print(response.text)
        tree = ET.fromstring(response.text)

        return tree.find('{http://www.opengis.net/wps/2.0}JobID').text
    
    def wps_result(self, jobId):
        
        url = self.copa_backend_url + 'wps/'+ jobId +'?userId=' + self.user_id
        response = requests.get(url, headers = {'Authorization': 'Bearer '+ self.oauth_token})
        
        return response
    
        
    def add_value(self, xml_file, input_id, input_value, xml_out):
        '''
        Add element (value, path, ...) in the xml conf file
        
        Input:
            xml_file (str): path to the empty xml conf file
            input_id (str): ID that must be completed
            input_value (str): element that must be added
            xml_out (str): path to the new xml conf file
        '''
        
        # Open original file
        tree = ET.parse(xml_file)
        root = tree.getroot()

        list_child = []
        for child in root:
            list_child.append(child.attrib)

        for id_ in range(1, len(list_child)):
            id_name = list_child[id_]["id"]
            if id_name == input_id:
                root[id_][0][0].text = input_value


        tree.write(xml_out)
        
    def get_url(self, response, data):
        '''
        Find data url after the GetResult
        
        Input:
            response : GetResult response
            str_input (str): data name
        '''
        str_input = "{0}?".format(data)
        
        pointer = response.text.find(str_input)

        urlstart = response.text.find('https', pointer-200, pointer+1)
        urlend = response.text.find('"/>', pointer-1, pointer+200)


        urlfinal = response.text[urlstart:urlend]
        
        return urlfinal