4
\$\begingroup\$

The following code works and will be put into cron on a AWS server (not sure what specifications, cores, kernels etc) at a every 1 minute frequency.

However, it's my first time putting a script into production, so I'd like some help reviewing this code. Please look it over and point to any hidden mistakes or something that's risky to implement or is just bad design. Additionally, it currently runs within a minute because I've limited the design to optimize 100 rows of data at a time.

Could this be made faster and allow me to optimize even more rows? Hypothetically, I can have row count in the 10s of 1000s, but since I sort them according to my cost, they become less valuable down below- but still if it can be done, it'll help after a while when the product scales.

Stack: MySQL where main data is stored & fetched. Redis database is where changes need to go. This script will be placed in a 1 minute cron in a separate server. All are on independant AWS servers. I haven't really taken any redis based optimization while building this, not sure how much of a difference it could make.

import MySQLdb
import pandas as pd
import datetime
import redis
from decimal import Decimal, DecimalException
import math
import cPickle as pickle
import re
from pandas.util.testing import assert_frame_equal

conn = MySQLdb.connect(host="AWS_HOST", user="root", passwd="pasword",db="db_name", local_infile=1)
cursor = conn.cursor(MySQLdb.cursors.DictCursor)
cursor.execute("SET NAMES utf8")
now = datetime.datetime.now()+datetime.timedelta(minutes=330)
subtract = now.minute if now.minute < 30 else now.minute-30
now-=datetime.timedelta(minutes=subtract)
read_query = """
select group, url, sum(acost)/1000000 cost, sum(rpm-acost)/1000000 pft, 
sum(lg) imps, sum(acost)/1000000/sum(lg)*1000 cpm, sum(rpm)/1000000/sum(lg)*1000 rpm,
sum(rpm-acost)/1000000/sum(lg)*1000 ppm
from db_name.db 
where  dh = '{}'
group by group, url having pft < 0
order by sum(acost) desc;""".format(now.strftime('%Y-%m-%d %H:%M:00'))
cursor.execute(read_query)
cost_data_new = pd.DataFrame(list(cursor.fetchall()))

cost_data_old = pickle.load( open( "cost_data_old.p", "rb" ))
## manage structure

global changed,r 
r = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
changed=True
try:
    if assert_frame_equal(cost_data_new, cost_data_old[['group','url','cost','pft','imps','cpm','rpm','ppm']], check_dtype=False):
        changed=False
except:
    changed=True
def rename_x(col_name):
    if re.match(r".*\_(x)$", col_name):
        return (col_name[:-2])
    else:
        return col_name

def cal_rpm(rw):
    try:
        return (rw['pft_diff']+rw['cost_diff'])/rw['imps_diff']*1000
    except DecimalException:
        return 0
    except Exception as e:
        raise e
        #print rw['group'], rw['url'], rw['pft_diff'], rw['cost_diff'], rw['imps_diff']
        print "proceeding with 0"
        return 0

def cal_cpm(rw):
    try:
        return rw['cost_diff']/rw['imps_diff']*1000
    except DecimalException:
        return 0
    except Exception as e:
        raise e
        #print rw['group'], rw['url'], rw['cost_diff'], rw['imps_diff']
        print "proceeding with 0"
        return 0

def cal_ppm(rw):
    try:
        return rw['pft_diff']/rw['imps_diff']*1000
    except DecimalException:
        return 0
    except Exception as e:
        raise e
        print "proceeding with 0"
        return 0
def get_diff(new,old):
    diff = pd.merge(new,old,how='outer',on=['group','url'])
    diff.fillna(0,inplace=True)
    diff['pft_diff']=diff['pft_x']-diff['pft_y']
    diff['cost_diff']=diff['cost_x']-diff['cost_y']
    diff['imps_diff']=diff['imps_x']-diff['imps_y']
    diff['diff_rpm']=diff[['group','url','pft_diff','cost_diff','imps_diff']].apply(cal_rpm,axis=1)
    diff['diff_cpm']=diff[['group','url','cost_diff','imps_diff']].apply(cal_cpm,axis=1)
    diff['diff_ppm']=diff[['group','url','pft_diff','imps_diff']].apply(cal_ppm,axis=1)
    diff=diff.rename(columns=rename_x)
    diff.drop(list(diff.filter(regex = '_y')), axis = 1, inplace = True)
    try:
        del diff['optimized']
    except:
        pass
    return diff

def calc_bid_prob(lgpm):
    beta = 0.01
    alpha = 1
    infl = 13
    slope = 1
    prob=int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))
    return prob

def optimize_val(rw):
    ppm = rw['diff_ppm']
    lg = rw['imps_diff']
    rpm = rw['diff_rpm']
    cpm = rw['diff_cpm']
    bid = r.hget("plcmnt:{}".format(rw['group']),"{}".format(rw['url']))
    try:
        bid=int(bid)
    except:
        bid=20
    b_prob=r.hget("url:prob:{}".format(rw['group']),"{}".format(rw['url']))
    try:
        if ppm < -1:
            if rpm >= 2:
                new_bid = min((1-0.3)*rpm,bid)
                new_bid_prob = min(calc_bid_prob(lg),b_prob)
                r.hset("plcmnt:{}".format(rw['group']),rw['url'],new_bid)
                r.hset("url:prob:{}".format(rw['group']),rw['url'],new_bid)
            else:
                new_bid = min((1-0.5)*rpm,bid)
                new_bid_prob = min(calc_bid_prob(lg),b_prob)
                r.hset("plcmnt:{}".format(rw['group']),rw['url'],new_bid)
                r.hset("url:prob:{}".format(rw['group']),rw['url'],new_bid_prob)
        elif (ppm >= -1) & (ppm < 0):
            if rpm >= 2:
                new_bid_prob = min(calc_bid_prob(lg),b_prob)
                r.hset("url:prob:{}".format(rw['group']),rw['url'],new_bid_prob)
            else:
                new_bid = min((1-0.3)*rpm,bid)
                new_bid_prob = min(calc_bid_prob(lg),b_prob)
                r.hset("plcmnt:{}".format(rw['group']),rw['url'],new_bid)
                r.hset("url:prob:{}".format(rw['group']),rw['url'],new_bid_prob)
        else:
            pass
        return 1
    except Exception as e:
        #log exception
        return 0
## if not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
if changed:
    ## optimize diff_data
    cost_data_diff=get_diff(cost_data_new,cost_data_old[['group','url','cost','pft','imps','cpm','rpm','ppm']])
    cost_data_diff['optimized']=0
    cost_data_diff.sort_values(by=['cost_diff'],ascending=False,inplace=True)
    optimize=cost_data_diff.head(100)
    optimize['optimized']=optimize.apply(optimize_val,axis=1)
    pickle.dump(cost_data_diff,open("cost_data_old.p","wb"))
else:
    cost_data_old.sort_values(by=['cost'],ascending=False,inplace=True)
    optimize = cost_data_old[cost_data_old['optimized']==0].head(100)
    optimize['optimized']=optimize.apply(optimize_val,axis=1)
    pickle.dump(cost_data_old,open("cost_data_old.p","wb"))


import sys
sys.exit()

cost_data_old_example:

      group acost_x cpm_x      imps_x  pft_x        ppm_x   rpm_x  \
0     6841  0.0002  0.12150000      2  -0.0002  -0.12150000  0E-8   
1     6891  0.0002  0.19900000      1  -0.0002  -0.19900000  0E-8   
2     7174  0.0001  0.14900000      1  -0.0001  -0.14900000  0E-8   
3     6732  0.0001  0.14600000      1  -0.0001  -0.14600000  0E-8   
4     6882  0.0001  0.13500000      1  -0.0001  -0.13500000  0E-8   
5     6856  0.0001  0.10700000      1  -0.0001  -0.10700000  0E-8   
6     6838  0.0001  0.08700000      1  -0.0001  -0.08700000  0E-8   
7     6838  0.0001  0.08600000      1  -0.0001  -0.08600000  0E-8   
8     6838  0.0001  0.08600000      1  -0.0001  -0.08600000  0E-8   

                                       url  cost_y    pft_y imps_y  \
0                                 url1.org  0.0002  -0.0002      2   
1                                 url2.com  0.0002  -0.0002      1   
2                                 url3.com  0.0001  -0.0001      1   
3                                  url4.tv  0.0001  -0.0001      1   
4                                 url5.com  0.0001  -0.0001      1   
5                                 url6.com  0.0001  -0.0001      1   
6                                 url7.com  0.0001  -0.0001      1   
7                                 url8.com  0.0001  -0.0001      1   
8                                 url9.com  0.0001  -0.0001      1   

        cpm_y rpm_y        ppm_y pft_diff cost_diff imps_diff  
0  0.12150000  0E-8  -0.12150000   0.0000    0.0000         0  
1  0.19900000  0E-8  -0.19900000   0.0000    0.0000         0  
2  0.14900000  0E-8  -0.14900000   0.0000    0.0000         0  
3  0.14600000  0E-8  -0.14600000   0.0000    0.0000         0  
4  0.13500000  0E-8  -0.13500000   0.0000    0.0000         0  
5  0.10700000  0E-8  -0.10700000   0.0000    0.0000         0  
6  0.08700000  0E-8  -0.08700000   0.0000    0.0000         0  
7  0.08600000  0E-8  -0.08600000   0.0000    0.0000         0  
8  0.08600000  0E-8  -0.08600000   0.0000    0.0000         0 

cost_data_new_shape:

(1587, 8)
\$\endgroup\$

1 Answer 1

2
\$\begingroup\$

In no particular order:

  • I have a hard time understanding why the get_diff function use vectorized operations sometimes and apply some other times. If the aim it to avoid NaNs in the dataframe, you can simply use fillna afterwards. If you want to avoid inf that would come from a division by 0, you could replace them with zeros or NaNs depending on your use-case. In any case, the cal_xxx functions are better replaced by vectorized operations.
  • Your lack of whitespace and your usage of acronyms as variable names makes your code hard to read and understand.
  • You open things that you never close, this includes files and database connections. The with statement and the closing utility are your friends here.
  • assert_frame_equal can safely be replaced by equals that behaves more nicely: it only returns True or False. The changed variable can then be safely eliminated.
  • Your top-level code is better put into one or several functions to improve maintainability and testing. In the same vein, the global keyword is to be avoided and having functions (such as optimize_val) rely on the external initialisation of an object they need to use is prone to errors. Instead, pass the object as a parameter; and if you can't controll the calling point (such as using apply that will call the function with a single parameter), you can make use of functools.partial to bind some parameters beforehand.
  • You should avoid putting credentials into your script, pass them on the command line (and use argparse for instance to retrieve them).
  • Don't fall into the habit of preparing SQL statement using format. Even though in this instance it sounds rather safe, you better train yourself to use parametrized statements.

Proposed improvements:

import re
import math
import datetime
import cPickle as pickle
from functools import partial
from contextlib import closing

import redis
import MySQLdb
import pandas as pd


READ_QUERY = """\
SELECT group, url,
       sum(lg) imps,
       sum(acost)/1000000 cost,
       sum(rpm-acost)/1000000 pft,
       sum(rpm)/1000000/sum(lg)*1000 rpm,
       sum(acost)/1000000/sum(lg)*1000 cpm,
       sum(rpm-acost)/1000000/sum(lg)*1000 ppm
FROM db_name.db
WHERE dh = %s
GROUP BY group, url HAVING pft < 0
ORDER BY sum(acost) DESC;"""


def read_database(host, user, password, database, local_infile=True):
    now = datetime.datetime.now() + datetime.timedelta(minutes=330)
    minutes = 0 if now.minute < 30 else 30
    date = now.replace(minute=minutes, second=0, microsecond=0)

    with closing(MySQLdb.connect(host=host, user=user, passwd=password, db=database, local_infile=int(local_infile))) as connection:
        cursor = connection.cursor(MySQLdb.cursors.DictCursor)
        cursor.execute("SET NAMES utf8")
        cursor.execute(READ_QUERY, (date,))
        return pd.DataFrame(list(cursor.fetchall()))


def rename_x(col_name):
    if re.match(r'.*\_(x)$', col_name):
        return (col_name[:-2])
    else:
        return col_name


def get_diff(new, old):
    diff = pd.merge(new, old, how='outer', on=['group', 'url'])
    diff.fillna(0, inplace=True)
    diff['pft_diff'] = diff['pft_x'] - diff['pft_y']
    diff['cost_diff'] = diff['cost_x'] - diff['cost_y']
    diff['imps_diff'] = diff['imps_x'] - diff['imps_y']
    diff['diff_rpm'] = (diff['pft_diff'] + diff['cost_diff']) / diff['imps_diff'] * 1000
    diff['diff_cpm'] = diff['cost_diff'] / diff['imps_diff'] * 1000
    diff['diff_ppm'] = diff['pft_diff'] / diff['imps_diff'] * 1000
    diff = diff.rename(columns=rename_x)
    diff.drop(list(diff.filter(regex='_y')), axis=1, inplace=True)
    diff.replace(pd.np.inf, 0, inplace=True)
    diff['optimized'] = 0
    return diff


def calc_bid_prob(lgpm, alpha=1, beta=0.01, infl=13, slope=1):
    return int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))


def optimize_val(rw, redis):
    ppm = rw['diff_ppm']
    lg = rw['imps_diff']
    rpm = rw['diff_rpm']

    redis_group_plcmnt = 'plcmnt:{}'.format(rw['group'])
    redis_group_url = 'url:prob:{}'.format(rw['group'])
    redis_url = rw['url']
    bid = redis.hget(redis_group_plcmnt, redis_url)
    try:
        bid = int(bid)
    except ValueError:
        bid = 20

    b_prob = redis.hget(redis_group_url, redis_url)
    try:
        if ppm < 0:
            if ppm < -1:
                if rpm >= 2:
                    new_bid = min((1-0.3)*rpm, bid)
                else:
                    new_bid = min((1-0.5)*rpm, bid)
            else:
                if rpm < 2:
                    new_bid = min((1-0.3)*rpm, bid)
            new_bid_prob = min(calc_bid_prob(lg), b_prob)
            redis.hset(redis_group_plcmnt, redis_url, new_bid)
            redis.hset(redis_group_url, redis_url, new_bid_prob)
        return 1
    except Exception:
        return 0


def compare_and_optimize(cost_data, optimizer, filename='cost_data_old.p'):
    with open(filename, 'rb') as pickled_file:
        cost_data_old = pickle.load(pickled_file)

    if cost_data.equals(cost_data_old):
        # If not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
        cost_data_result = cost_data_old
        cost_data_old.sort_values(by=['cost'], ascending=False, inplace=True)
        optimize = cost_data_old[cost_data_old['optimized'] == 0].head(100)
    else:
        # Optimize diff_data
        cost_data_result = get_diff(cost_data, cost_data_old[['group', 'url', 'cost', 'pft', 'imps', 'cpm', 'rpm', 'ppm']])
        cost_data_result.sort_values(by=['cost_diff'], ascending=False, inplace=True)
        optimize = cost_data_result.head(100)

    optimize['optimized'] = optimize.apply(optimizer, axis=1)
    with open(filename, 'wb') as pickle_file:
        pickle.dump(cost_data_result, pickle_file)


if __name__ == '__main__':
    cost_data = read_database('AWS_HOST', 'root', 'password', 'db_name', True)
    redis_db = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
    compare_and_optimize(cost_data, partial(optimize_val, redis=redis_db))
\$\endgroup\$
1
  • \$\begingroup\$ Thank you for the suggestions. Let me test this and get back to you \$\endgroup\$ Commented May 1, 2018 at 9:10

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.