0

I have the following code that iterates over an array of type LIST ['ABC','AAA','BBB'], sending http requests to api, the received data is saved to another array and sent via email and telegram Now the array is processed sequentially and it is slow.

I am trying to do parallel processing of this data, now I am trying to use asinkio, but I get an array type error on execution - TypeError: 'async for' requires an object with __aiter__ method, got Series

Can you advise how best to solve this problem or how to correctly convert the array type?

Current code:

for value in alldata:
            print('Processing', value)           
            today = str(datetime.today().strftime('%d.%m.%Y'))
            if debug_mode is True:
                start = "18.09.2020" 
                end = "18.09.2020"
            else:
                start = today
                end = today
            ########
            periods={'tick': 1, 'min': 2, '5min': 3, '10min': 4, '15min': 5, '30min': 6, 'hour': 7, 'daily': 8, 'week': 9, 'month': 10}
            print ("value="+value+"; period="+str(period)+"; start="+start+"; end="+end)
            try:
                Ids = str((urlopen('https://testapi.dev/export.js').readlines())[0])
                Codes = str((urlopen('https://testapi.dev/export.js').readlines())[2])
            except Exception:
                print('Cannot get Ids & Codes')
            try:
                index = Codes.index(value)
                symbol_code = str(Ids[index])
            except Exception:
                try:
                    Ids = str((urlopen('https://testapi.dev/import')
                    Codes = str((urlopen('https://testapi.dev/import')
                    index = Codes.index(value)
                    symbol_code = str(Ids[index])
                except Exception:
                    print("value not in list" ,value)
            region = 0 
            start_date = datetime.strptime(start, "%d.%m.%Y").date()
            start_date_rev=datetime.strptime(start, '%d.%m.%Y').strftime('%Y%m%d')
            end_date = datetime.strptime(end, "%d.%m.%Y").date()
            end_date_rev=datetime.strptime(end, '%d.%m.%Y').strftime('%Y%m%d')
            params = urlencode([
                                ('region', region), 
                                ('symbol', symbol_code), 
                                ('code', value), 
                                ('df', start_date.day), 
                                ('mf', start_date.month - 1), 
                                ('yf', start_date.year), 
                                ('from', start_date), 
                                ('dt', end_date.day), 
                                ('mt', end_date.month - 1), 
                                ('yt', end_date.year), 
                                ('to', end_date), 
                                ('p', period), 
                                ('f', value+"_" + start_date_rev + "_" + end_date_rev)

            url = FULL_URL + value+"_" + start_date_rev + "_" + end_date_rev  + params
            try:
                txt=urlopen(url).readlines() 
            except Exception:
                try:
                    time.sleep(random.randint(1, 10))
                    txt=urlopen(url).readlines() 
                except Exception:
                    time.sleep(random.randint(1, 10))
                    txt=urlopen(url).readlines() 
            try:
                imported_data = [] 
                
                for line in txt: 
                    
                    imported_data.append(line.strip().decode( "utf-8" ).replace(',',";"))   
            except Exception:
                print("Cannot get data ")
            try:
                current_value = (imported_data[1].split(";")[0])
                first_price = float(imported_data[1].split(";")[5])
                last_price = float(imported_data[-1].split(";")[5])
                percent_difference = float(  (last_price / first_price) * 100 - 100 )
                time.sleep(int(request_delay))
                if percent_difference > percent_trigger :
                    trigger = True
                    if ( str(value) + ',' + str(today) ) in already_found:
                        print( 'Value ' + str(value) + ' already found' )
                    else:
                        take_profit = last_price * (1 + 5 / 100)
                        found_tickers.append(str(current_value + ',' + str(first_price) + ',' + str(last_price) + ',' + str(take_profit)))
                        already_found.append( str(value) + ',' + str(today) )
                        if send_immediately == 'yes':
                            try:

                                subject = str(value)
                                mail_content = (str(current_value + ',' + str(first_price) + ',' + str(last_price) + ',' + str(take_profit)) )
                                #The mail addresses and password
                                #Setup the MIME
                                message = MIMEMultipart()
                                message['From'] = sender_address
                                message['To'] = receiver_address
                                message['Subject'] =  subject  #The subject line
                                message['X-Priority'] = '1'
                                #The body and the attachments for the mail
                                message.attach(MIMEText(mail_content, 'plain'))
                                #Create SMTP session for sending the mail
                                session = smtplib.SMTP(smtp_server, smtp_port) #use gmail with port
                                session.starttls() #enable security
                                session.login(sender_address, sender_pass) #login with mail_id and password
                                text = message.as_string()
                                session.sendmail(sender_address, receiver_address, text)
                                session.quit()
                            except Exception:
                                print("Cannot send Email")
                            # Sent to telegram
                            try:
                                telegram_bot_sendtext((str(current_value) + ' ' + str(first_price) + ' ' + str(last_price) + ' ' + str(take_profit) ) )
                            except Exception:
                                print("Cannot sent message to Telegram")
                else:
                    trigger = False
            except Exception:
                print("Processing error for value" ,value)

Parallel code:

async def main(alldata):

    for value in alldata:
                print('Processing', value)           
                today = str(datetime.today().strftime('%d.%m.%Y'))
                if debug_mode is True:
                    start = "18.09.2020" 
                    end = "18.09.2020"
                else:
                    start = today
                    end = today
                ########
                periods={'tick': 1, 'min': 2, '5min': 3, '10min': 4, '15min': 5, '30min': 6, 'hour': 7, 'daily': 8, 'week': 9, 'month': 10}
                print ("value="+value+"; period="+str(period)+"; start="+start+"; end="+end)
                try:
                    Ids = str((urlopen('https://testapi.dev/export.js').readlines())[0])
                    Codes = str((urlopen('https://testapi.dev/export.js').readlines())[2])
                except Exception:
                    print('Cannot get Ids & Codes')
                try:
                    index = Codes.index(value)
                    symbol_code = str(Ids[index])
                except Exception:
                    try:
                        Ids = str((urlopen('https://testapi.dev/import')
                        Codes = str((urlopen('https://testapi.dev/import')
                        index = Codes.index(value)
                        symbol_code = str(Ids[index])
                    except Exception:
                        print("value not in list" ,value)
                region = 0 
                start_date = datetime.strptime(start, "%d.%m.%Y").date()
                start_date_rev=datetime.strptime(start, '%d.%m.%Y').strftime('%Y%m%d')
                end_date = datetime.strptime(end, "%d.%m.%Y").date()
                end_date_rev=datetime.strptime(end, '%d.%m.%Y').strftime('%Y%m%d')
                params = urlencode([
                                    ('region', region), 
                                    ('symbol', symbol_code), 
                                    ('code', value), 
                                    ('df', start_date.day), 
                                    ('mf', start_date.month - 1), 
                                    ('yf', start_date.year), 
                                    ('from', start_date), 
                                    ('dt', end_date.day), 
                                    ('mt', end_date.month - 1), 
                                    ('yt', end_date.year), 
                                    ('to', end_date), 
                                    ('p', period), 
                                    ('f', value+"_" + start_date_rev + "_" + end_date_rev)

                url = FULL_URL + value+"_" + start_date_rev + "_" + end_date_rev  + params
                try:
                    txt=urlopen(url).readlines() 
                except Exception:
                    try:
                        time.sleep(random.randint(1, 10))
                        txt=urlopen(url).readlines() 
                    except Exception:
                        time.sleep(random.randint(1, 10))
                        txt=urlopen(url).readlines() 
                try:
                    imported_data = [] 
                    
                    for line in txt: 
                        
                        imported_data.append(line.strip().decode( "utf-8" ).replace(',',";"))   
                except Exception:
                    print("Cannot get data ")
                try:
                    current_value = (imported_data[1].split(";")[0])
                    first_price = float(imported_data[1].split(";")[5])
                    last_price = float(imported_data[-1].split(";")[5])
                    percent_difference = float(  (last_price / first_price) * 100 - 100 )
                    time.sleep(int(request_delay))
                    if percent_difference > percent_trigger :
                        trigger = True
                        if ( str(value) + ',' + str(today) ) in already_found:
                            print( 'Value ' + str(value) + ' already found' )
                        else:
                            take_profit = last_price * (1 + 5 / 100)
                            found_tickers.append(str(current_value + ',' + str(first_price) + ',' + str(last_price) + ',' + str(take_profit)))
                            already_found.append( str(value) + ',' + str(today) )
                            if send_immediately == 'yes':
                                try:

                                    subject = str(value)
                                    mail_content = (str(current_value + ',' + str(first_price) + ',' + str(last_price) + ',' + str(take_profit)) )
                                    #The mail addresses and password
                                    #Setup the MIME
                                    message = MIMEMultipart()
                                    message['From'] = sender_address
                                    message['To'] = receiver_address
                                    message['Subject'] =  subject  #The subject line
                                    message['X-Priority'] = '1'
                                    #The body and the attachments for the mail
                                    message.attach(MIMEText(mail_content, 'plain'))
                                    #Create SMTP session for sending the mail
                                    session = smtplib.SMTP(smtp_server, smtp_port) #use gmail with port
                                    session.starttls() #enable security
                                    session.login(sender_address, sender_pass) #login with mail_id and password
                                    text = message.as_string()
                                    session.sendmail(sender_address, receiver_address, text)
                                    session.quit()
                                except Exception:
                                    print("Cannot send Email")
                                # Sent to telegram
                                try:
                                    telegram_bot_sendtext((str(current_value) + ' ' + str(first_price) + ' ' + str(last_price) + ' ' + str(take_profit) ) )
                                except Exception:
                                    print("Cannot sent message to Telegram")
                    else:
                        trigger = False
                except Exception:
                    print("Processing error for value" ,value)
asyncio.run(main(alldata))
1
  • This is not parallel code. Commented Sep 26, 2020 at 21:02

1 Answer 1

1

Async in Python

Let's begin by clarifying that both asynchronous code and multi processing are two different approaches of concurrency. So an async approach will not bet executed in parallel.

If I'm not mistaken, your function parallel.main, apart from the asnyc def line, does not have any trace of asynchronicity. Async, at least in Python, requires usually some serious restructuring of the code base: every code execution which is to be executed asynchronously (e.g. network requests) has to be refactored and declared as asnyc.

On the other hand, multi processing in Python is much simpler: import multiprocessing, create a pool & apply your function.

Async Example

Since your code is quite extensive and I do not know which steps actually are to be executed asynchronously, here is an example of how asnyc can be used in Python:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import aiohttp
import asyncio
from aiohttp.typedefs import URL
from typing import List, NoReturn, Union, Tuple


TIMEOUT: int = 5


def print_job_started(job_name: str) -> NoReturn:
    print(job_name, "STARTED")


def print_job_finished(job_name: str) -> NoReturn:
    print(job_name, "FINISHED")


async def asnyc_request_content(
        session: aiohttp.ClientSession,
        method: str,
        url: Union[str, URL],
        timeout: int = TIMEOUT,
        **kwargs
) -> Tuple[str, int]:
    """
    Abstract asynchronous request. Returns the text content & status code.
    """
    async with session.request(method=method, url=url, timeout=timeout, **kwargs) as response:
        return await response.text(), response.status


async def fun(session: aiohttp.ClientSession, url: str) -> Tuple[str, int]:
    print_job_started("fun")
    response = await asnyc_request_content(session=session, method="get", url=url)
    print_job_finished("fun")
    return response


async def _main(url_list: List[str]):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in url_list:
            tasks.append(asyncio.ensure_future(fun(session=session, url=url)))
        return await asyncio.gather(*tasks)


def main():
    url_list = [
        "https://example.com" for _ in range(10)
    ]
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(_main(url_list=url_list))
    return loop.run_until_complete(future)


if __name__ == '__main__':
    res = main()
    # fun STARTED
    # fun STARTED
    # fun STARTED
    # fun STARTED
    # fun STARTED
    # fun STARTED
    # fun STARTED
    # fun STARTED
    # fun STARTED
    # fun STARTED
    # fun FINISHED
    # fun FINISHED
    # fun FINISHED
    # fun FINISHED
    # fun FINISHED
    # fun FINISHED
    # fun FINISHED
    # fun FINISHED
    # fun FINISHED
    # fun FINISHED
    for r in res:
        print(r[1])
    # 200
    # 200
    # 200
    # 200
    # 200
    # 200
    # 200
    # 200
    # 200
    # 200

The example is fairly elementary but suffices for demonstration purposes. When I deal with async my typical workflow is the as follows:

  1. define the asnyc functions (asnyc_request_content & fun)
  2. create a first async wrapper where the tasks are defined (_main)
  3. finalize with a second sync wrapper where the loop is defined (main)

This is just one way of implementing it - there are many other alternatives. However, regardless of which alternative you choose, step 1. will always need to be done (which is usually the most time consuming one).


Closing Note

For an asnyc approach it seems to me you would still have to deal with step 1. which can be quite tedious: your code involves many network requests. If you want a full-asnyc code, all lines with requests would need to be refactored. If your goal is to have a partially-async approach, then you'd have much less refactoring but all remaining synchronous requests would seriously bottleneck your code (such an approach is usually discouraged).

On the other hand, implementing a multi processing approach would be extremely fast (to develop) as you can reuse your code pretty much as-is.

Finally, asnyc could still make a lot sense (e.g. not enough CPUs/Threads for significant parallel processing as is the case on servers, more efficient/scalable, ...) but it requires definitely more work.

Sign up to request clarification or add additional context in comments.

2 Comments

Multiprocessing isn't parallelization either - it's concurrency.
@erip Parellelism is literally in the description of the module.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.