"""
Tax-Calculator Input-Output class.
"""
# CODING-STYLE CHECKS:
# pycodestyle taxcalcio.py
# pylint --disable=locally-disabled taxcalcio.py
import os
import gc
import copy
import sqlite3
import numpy as np
import pandas as pd
import paramtools
from taxcalc.policy import Policy
from taxcalc.records import Records
from taxcalc.consumption import Consumption
from taxcalc.growdiff import GrowDiff
from taxcalc.growfactors import GrowFactors
from taxcalc.calculator import Calculator
from taxcalc.utils import (delete_file, write_graph_file,
add_quantile_table_row_variable,
unweighted_sum, weighted_sum)
[docs]
class TaxCalcIO():
"""
Constructor for the Tax-Calculator Input-Output class.
TaxCalcIO class constructor call must be followed by init() call.
Parameters
----------
input_data: string or Pandas DataFrame
string is name of INPUT file that is CSV formatted containing
variable names in the Records USABLE_READ_VARS set, or
Pandas DataFrame is INPUT data containing variable names in
the Records USABLE_READ_VARS set. INPUT vsrisbles not in the
Records USABLE_READ_VARS set can be present but are ignored.
tax_year: integer
calendar year for which taxes will be computed for INPUT.
baseline: None or string
None implies baseline policy is current-law policy, or
string is name of optional BASELINE file that is a JSON
reform file.
reform: None or string
None implies no policy reform (current-law policy), or
string is name of optional REFORM file(s).
assump: None or string
None implies economic assumptions are standard assumptions,
or string is name of optional ASSUMP file.
silent: boolean
whether or not to suppress action messages.
Returns
-------
class instance: TaxCalcIO
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, input_data, tax_year, baseline, reform, assump,
silent=True):
# pylint: disable=too-many-arguments,too-many-positional-arguments
# pylint: disable=too-many-branches,too-many-statements,too-many-locals
self.silent = silent
self.gf_reform = None
self.errmsg = ''
# check name and existence of INPUT file
inp = 'x'
self.puf_input_data = False
self.cps_input_data = False
self.tmd_input_data = False
self.tmd_weights = None
self.tmd_gfactor = None
if isinstance(input_data, str):
# remove any leading directory path from INPUT filename
fname = os.path.basename(input_data)
# check if fname ends with ".csv"
if fname.endswith('.csv'):
inp = f'{fname[:-4]}-{str(tax_year)[2:]}'
else:
msg = 'INPUT file name does not end in .csv'
self.errmsg += f'ERROR: {msg}\n'
# check existence of INPUT file
self.puf_input_data = input_data.endswith('puf.csv')
self.cps_input_data = input_data.endswith('cps.csv')
self.tmd_input_data = input_data.endswith('tmd.csv')
if not self.cps_input_data and not os.path.isfile(input_data):
msg = 'INPUT file could not be found'
self.errmsg += f'ERROR: {msg}\n'
# if tmd_input_data is True, construct weights and gfactor paths
if self.tmd_input_data: # pragma: no cover
tmd_dir = os.path.dirname(input_data)
if 'TMD_AREA' in os.environ:
area = os.environ['TMD_AREA']
wfile = f'{area}_tmd_weights.csv.gz'
inp = f'{fname[:-4]}_{area}-{str(tax_year)[2:]}'
else: # using national weights
wfile = 'tmd_weights.csv.gz'
self.tmd_weights = os.path.join(tmd_dir, wfile)
self.tmd_gfactor = os.path.join(tmd_dir, 'tmd_growfactors.csv')
if not os.path.isfile(self.tmd_weights):
msg = f'weights file {self.tmd_weights} could not be found'
self.errmsg += f'ERROR: {msg}\n'
if not os.path.isfile(self.tmd_gfactor):
msg = f'gfactor file {self.tmd_gfactor} could not be found'
self.errmsg += f'ERROR: {msg}\n'
elif isinstance(input_data, pd.DataFrame):
inp = f'df-{str(tax_year)[2:]}'
else:
msg = 'INPUT is neither string nor Pandas DataFrame'
self.errmsg += f'ERROR: {msg}\n'
# check name(s) and existence of BASELINE file(s)
bas = '-x'
if baseline is None:
self.specified_baseline = False
bas = '-#'
elif isinstance(baseline, str):
self.specified_baseline = True
# split any compound baseline into list of simple reforms
basnames = []
baselines = baseline.split('+')
for bas in baselines:
# remove any leading directory path from bas filename
fname = os.path.basename(bas)
# check if fname ends with ".json"
if not fname.endswith('.json'):
msg = f'{fname} does not end in .json'
self.errmsg += f'ERROR: BASELINE file name {msg}\n'
# check existence of BASELINE file
if not os.path.isfile(bas):
msg = f'{bas} could not be found'
self.errmsg += f'ERROR: BASELINE file {msg}\n'
# add fname to list of basnames used in output file names
basnames.append(fname)
# create (possibly compound) baseline name for output file names
bas = '-'
num_basnames = 0
for basname in basnames:
num_basnames += 1
if num_basnames > 1:
bas += '+'
bas += f'{basname[:-5]}'
else:
msg = 'TaxCalcIO.ctor: baseline is neither None nor str'
self.errmsg += f'ERROR: {msg}\n'
# check name(s) and existence of REFORM file(s)
ref = '-x'
if reform is None:
self.specified_reform = False
ref = '-#'
elif isinstance(reform, str):
self.specified_reform = True
# split any compound reform into list of simple reforms
refnames = []
reforms = reform.split('+')
for rfm in reforms:
# remove any leading directory path from rfm filename
fname = os.path.basename(rfm)
# check if fname ends with ".json"
if not fname.endswith('.json'):
msg = f'{fname} does not end in .json'
self.errmsg += f'ERROR: REFORM file name {msg}\n'
# check existence of REFORM file
if not os.path.isfile(rfm):
msg = f'{rfm} could not be found'
self.errmsg += f'ERROR: REFORM file {msg}\n'
# add fname to list of refnames used in output file names
refnames.append(fname)
# create (possibly compound) reform name for output file names
ref = '-'
num_refnames = 0
for refname in refnames:
num_refnames += 1
if num_refnames > 1:
ref += '+'
ref += f'{refname[:-5]}'
else:
msg = 'TaxCalcIO.ctor: reform is neither None nor str'
self.errmsg += f'ERROR: {msg}\n'
# check name and existence of ASSUMP file
asm = '-x'
if assump is None:
asm = '-#'
elif isinstance(assump, str):
# remove any leading directory path from ASSUMP filename
fname = os.path.basename(assump)
# check if fname ends with ".json"
if fname.endswith('.json'):
asm = f'-{fname[:-5]}'
else:
msg = 'ASSUMP file name does not end in .json'
self.errmsg += f'ERROR: {msg}\n'
# check existence of ASSUMP file
if not os.path.isfile(assump):
msg = 'ASSUMP file could not be found'
self.errmsg += f'ERROR: {msg}\n'
else:
msg = 'TaxCalcIO.ctor: assump is neither None nor str'
self.errmsg += f'ERROR: {msg}\n'
# create OUTPUT file name and delete any existing output files
self.output_filename = f'{inp}{bas}{ref}{asm}.xxx'
self.delete_output_files()
# initialize variables whose values are set in init method
self.calc_ref = None
self.calc_bas = None
def delete_output_files(self):
"""
Delete all output files derived from self.output_filename.
"""
extensions = [
'-params.bas',
'-params.ref',
'-tables.text',
'-atr.html',
'-mtr.html',
'-pch.html',
'.db',
]
for ext in extensions:
delete_file(self.output_filename.replace('.xxx', ext))
def init(self, input_data, tax_year, baseline, reform, assump,
aging_input_data, exact_calculations):
"""
TaxCalcIO class post-constructor method that completes initialization.
Parameters
----------
First five are same as the first five of the TaxCalcIO constructor:
input_data, tax_year, baseline, reform, assump.
aging_input_data: boolean
whether or not to extrapolate Records data from data year to
tax_year.
exact_calculations: boolean
specifies whether or not exact tax calculations are done without
any smoothing of "stair-step" provisions in the tax law.
"""
# pylint: disable=too-many-arguments,too-many-positional-arguments
# pylint: disable=too-many-statements,too-many-branches,too-many-locals
self.errmsg = ''
# instantiate base and reform GrowFactors objects
if self.tmd_input_data:
gfactors_bas = GrowFactors(self.tmd_gfactor) # pragma: no cover
gfactors_ref = GrowFactors(self.tmd_gfactor) # pragma: no cover
else:
gfactors_bas = GrowFactors()
gfactors_ref = GrowFactors()
# check tax_year validity
max_tax_year = gfactors_bas.last_year
if tax_year > max_tax_year:
msg = f'TAXYEAR={tax_year} is greater than {max_tax_year}'
self.errmsg += f'ERROR: {msg}\n'
if self.puf_input_data:
min_tax_year = max( # pragma: no cover
Policy.JSON_START_YEAR, Records.PUFCSV_YEAR)
elif self.cps_input_data:
min_tax_year = max(
Policy.JSON_START_YEAR, Records.CPSCSV_YEAR)
elif self.tmd_input_data:
min_tax_year = max( # pragma: no cover
Policy.JSON_START_YEAR, Records.TMDCSV_YEAR)
else:
min_tax_year = Policy.JSON_START_YEAR
if tax_year < min_tax_year:
msg = f'TAXYEAR={tax_year} is less than {min_tax_year}'
self.errmsg += f'ERROR: {msg}\n'
# tax_year out of valid range means cannot proceed with calculations
if self.errmsg:
return
# get assumption sub-dictionaries
assumpdict = Calculator.read_json_param_objects(None, assump)
# get policy parameter dictionaries from --baseline file(s)
poldicts_bas = []
if self.specified_baseline:
for bas in baseline.split('+'):
pdict = Calculator.read_json_param_objects(bas, None)
poldicts_bas.append(pdict['policy'])
# get policy parameter dictionaries from --reform file(s)
poldicts_ref = []
if self.specified_reform:
for ref in reform.split('+'):
pdict = Calculator.read_json_param_objects(ref, None)
poldicts_ref.append(pdict['policy'])
# set last_b_year
last_b_year = max(tax_year, Policy.LAST_BUDGET_YEAR)
# create gdiff_baseline object
gdiff_baseline = GrowDiff(last_budget_year=last_b_year)
try:
gdiff_baseline.update_growdiff(assumpdict['growdiff_baseline'])
except paramtools.ValidationError as valerr_msg:
self.errmsg += str(valerr_msg)
# apply gdiff_baseline to gfactor_bas
gdiff_baseline.apply_to(gfactors_bas)
# specify gdiff_response object
gdiff_response = GrowDiff(last_budget_year=last_b_year)
try:
gdiff_response.update_growdiff(assumpdict['growdiff_response'])
except paramtools.ValidationError as valerr_msg:
self.errmsg += str(valerr_msg)
# apply gdiff_baseline and gdiff_response to gfactor_ref
gdiff_baseline.apply_to(gfactors_ref)
gdiff_response.apply_to(gfactors_ref)
self.gf_reform = copy.deepcopy(gfactors_ref)
# create Policy objects:
# ... the baseline Policy object
if self.specified_baseline:
pol_bas = Policy(
gfactors=gfactors_bas,
last_budget_year=last_b_year,
)
for poldict in poldicts_bas:
try:
pol_bas.implement_reform(
poldict,
print_warnings=True,
raise_errors=False,
)
if self.errmsg:
self.errmsg += "\n"
for _, errors in pol_bas.parameter_errors.items():
self.errmsg += "\n".join(errors)
except paramtools.ValidationError as valerr_msg:
self.errmsg += str(valerr_msg)
else:
pol_bas = Policy(
gfactors=gfactors_bas,
last_budget_year=last_b_year,
)
# ... the reform Policy object
if self.specified_reform:
pol_ref = Policy(
gfactors=gfactors_ref,
last_budget_year=last_b_year,
)
for poldict in poldicts_ref:
try:
pol_ref.implement_reform(
poldict,
print_warnings=True,
raise_errors=False,
)
if self.errmsg:
self.errmsg += "\n"
for _, errors in pol_ref.parameter_errors.items():
self.errmsg += "\n".join(errors)
except paramtools.ValidationError as valerr_msg:
self.errmsg += str(valerr_msg)
else:
pol_ref = Policy(
gfactors=gfactors_bas,
last_budget_year=last_b_year,
)
# create Consumption object
con = Consumption(last_budget_year=last_b_year)
try:
con.update_consumption(assumpdict['consumption'])
except paramtools.ValidationError as valerr_msg:
self.errmsg += str(valerr_msg)
# any errors imply cannot proceed with calculations
if self.errmsg:
return
# set policy to tax_year
pol_ref.set_year(tax_year)
pol_bas.set_year(tax_year)
# read input file contents into Records objects
if aging_input_data:
if self.cps_input_data:
recs_ref = Records.cps_constructor(
gfactors=gfactors_ref,
exact_calculations=exact_calculations,
)
recs_bas = Records.cps_constructor(
gfactors=gfactors_bas,
exact_calculations=exact_calculations,
)
elif self.tmd_input_data: # pragma: no cover
wghts = pd.read_csv(self.tmd_weights)
recs_ref = Records(
data=pd.read_csv(input_data),
start_year=Records.TMDCSV_YEAR,
weights=wghts,
gfactors=gfactors_ref,
adjust_ratios=None,
exact_calculations=exact_calculations,
weights_scale=1.0,
)
recs_bas = Records(
data=pd.read_csv(input_data),
start_year=Records.TMDCSV_YEAR,
weights=wghts,
gfactors=gfactors_bas,
adjust_ratios=None,
exact_calculations=exact_calculations,
weights_scale=1.0,
)
else: # if not {cps|tmd}_input_data but aging_input_data: puf
recs_ref = Records(
data=input_data,
gfactors=gfactors_ref,
exact_calculations=exact_calculations
)
recs_bas = Records(
data=input_data,
gfactors=gfactors_bas,
exact_calculations=exact_calculations
)
else: # input_data are raw data that are not being aged
recs_ref = Records(
data=input_data,
start_year=tax_year,
gfactors=None,
weights=None,
adjust_ratios=None,
exact_calculations=exact_calculations,
)
recs_bas = copy.deepcopy(recs_ref)
# create Calculator objects
self.calc_ref = Calculator(
policy=pol_ref,
records=recs_ref,
verbose=(not self.silent),
consumption=con,
sync_years=aging_input_data,
)
self.calc_bas = Calculator(
policy=pol_bas,
records=recs_bas,
verbose=False,
consumption=con,
sync_years=aging_input_data,
)
[docs]
def tax_year(self):
"""
Return calendar year for which TaxCalcIO calculations are being done.
"""
return self.calc_ref.current_year
[docs]
def output_filepath(self):
"""
Return full path to output file named in TaxCalcIO constructor.
"""
dirpath = os.path.abspath(os.path.dirname(__file__))
return os.path.join(dirpath, self.output_filename)
def advance_to_year(self, year, aging_data):
"""
Update self.output_filename and advance Calculator objects to year.
"""
# update self.output_filename and delete output files
parts = self.output_filename.split('-')
parts[1] = str(year)[2:]
self.output_filename = '-'.join(parts)
self.delete_output_files()
# advance baseline and reform Calculator objects to specified year
self.calc_bas.advance_to_year(year)
self.calc_ref.advance_to_year(year)
idata = 'Advance input data and ' if aging_data else 'Advance'
if not self.silent:
print(f'{idata} policy to {year}')
[docs]
def analyze(
self,
output_params=False,
output_tables=False,
output_graphs=False,
output_dump=False,
dump_varlist=None,
):
"""
Conduct tax analysis.
Parameters
----------
output_params: boolean
whether or not to write baseline and reform policy parameter
values to separate text files
output_tables: boolean
whether or not to generate and write distributional tables
to a text file
output_graphs: boolean
whether or not to generate and write HTML graphs of average
and marginal tax rates by income percentile
output_dump: boolean
whether or not to write SQLite3 database with baseline and
reform tables each containing the variables in dump_varlist.
dump_varlist: list
list of variables to include in dumpdb output;
list must include at least one variable.
Returns
-------
Nothing
"""
# pylint: disable=too-many-arguments,too-many-positional-arguments
# pylint: disable=too-many-branches,too-many-locals
doing_calcs = output_tables or output_graphs or output_dump
# optionally write --params output to text files
if output_params:
self.write_policy_params_files()
if not doing_calcs:
return
# do output calculations
self.calc_bas.calc_all()
self.calc_ref.calc_all()
if output_dump:
assert isinstance(dump_varlist, list)
assert len(dump_varlist) > 0
# might need marginal tax rates for dumpdb
(mtr_ptax_ref, mtr_itax_ref,
_) = self.calc_ref.mtr(wrt_full_compensation=False,
calc_all_already_called=True)
(mtr_ptax_bas, mtr_itax_bas,
_) = self.calc_bas.mtr(wrt_full_compensation=False,
calc_all_already_called=True)
else:
# do not need marginal tax rates for dumpdb
mtr_ptax_ref = None
mtr_itax_ref = None
mtr_ptax_bas = None
mtr_itax_bas = None
# optionally write --tables output to text file
if output_tables:
self.write_tables_file()
# optionally write --graphs output to HTML files
if output_graphs:
self.write_graph_files()
# optionally write --dumpdb output to SQLite database file
if output_dump:
self.write_dumpdb_file(
dump_varlist,
mtr_ptax_ref, mtr_itax_ref,
mtr_ptax_bas, mtr_itax_bas,
)
def write_policy_params_files(self):
"""
Write baseline and reform policy parameter values to separate files.
"""
param_names = Policy.parameter_list()
fname = self.output_filename.replace('.xxx', '-params.bas')
with open(fname, 'w', encoding='utf-8') as pfile:
for pname in param_names:
pval = self.calc_bas.policy_param(pname)
pfile.write(f'{pname} {pval}\n')
if not self.silent:
print( # pragma: no cover
f'Write baseline policy parameter values to file {fname}'
)
fname = self.output_filename.replace('.xxx', '-params.ref')
with open(fname, 'w', encoding='utf-8') as pfile:
for pname in param_names:
pval = self.calc_ref.policy_param(pname)
pfile.write(f'{pname} {pval}\n')
if not self.silent:
print( # pragma: no cover
f'Write reform policy parameter values to file {fname}'
)
[docs]
def write_tables_file(self):
"""
Write tables to text file.
"""
# pylint: disable=too-many-locals
tab_fname = self.output_filename.replace('.xxx', '-tables.text')
# skip tables if there are not some positive weights
if self.calc_bas.total_weight() <= 0.:
with open(tab_fname, 'w', encoding='utf-8') as tfile:
msg = 'No tables because sum of weights is not positive\n'
tfile.write(msg)
return
# create list of results for nontax variables
# - weights don't change with reform
# - expanded_income may change, so always use baseline expanded income
nontax_vars = ['s006', 'expanded_income']
nontax = [self.calc_bas.array(var) for var in nontax_vars]
# create list of results for tax variables from reform Calculator
tax_vars = ['iitax', 'payrolltax', 'lumpsum_tax', 'combined']
reform = [self.calc_ref.array(var) for var in tax_vars]
# create DataFrame with tax distribution under reform
dist = nontax + reform # using expanded_income under baseline policy
all_vars = nontax_vars + tax_vars
distdf = pd.DataFrame(data=np.column_stack(dist), columns=all_vars)
# create DataFrame with tax differences (reform - baseline)
base = [self.calc_bas.array(var) for var in tax_vars]
change = [(reform[idx] - base[idx]) for idx in range(0, len(tax_vars))]
diff = nontax + change # using expanded_income under baseline policy
diffdf = pd.DataFrame(data=np.column_stack(diff), columns=all_vars)
# write each kind of distributional table
with open(tab_fname, 'w', encoding='utf-8') as tfile:
TaxCalcIO.write_decile_table(distdf, tfile, tkind='Reform Totals')
tfile.write('\n')
TaxCalcIO.write_decile_table(diffdf, tfile, tkind='Differences')
# delete intermediate DataFrame objects
del distdf
del diffdf
gc.collect()
if not self.silent:
print( # pragma: no cover
f'Write tabular output to file {tab_fname}'
)
[docs]
@staticmethod
def write_decile_table(dfx, tfile, tkind='Totals'):
"""
Write to tfile the tkind decile table using dfx DataFrame.
"""
dfx = add_quantile_table_row_variable(dfx, 'expanded_income', 10,
decile_details=False,
pop_quantiles=False,
weight_by_income_measure=False)
gdfx = dfx.groupby('table_row', as_index=False, observed=True)
rtns_series = gdfx.apply(
unweighted_sum, 's006', include_groups=False
).values[:, 1]
xinc_series = gdfx.apply(
weighted_sum, 'expanded_income', include_groups=False
).values[:, 1]
itax_series = gdfx.apply(
weighted_sum, 'iitax', include_groups=False
).values[:, 1]
ptax_series = gdfx.apply(
weighted_sum, 'payrolltax', include_groups=False
).values[:, 1]
htax_series = gdfx.apply(
weighted_sum, 'lumpsum_tax', include_groups=False
).values[:, 1]
ctax_series = gdfx.apply(
weighted_sum, 'combined', include_groups=False
).values[:, 1]
# write decile table to text file
row = f'Weighted Tax {tkind} by Baseline Expanded-Income Decile\n'
tfile.write(row)
# pylint: disable=consider-using-f-string
rowfmt = '{}{}{}{}{}{}\n'
row = rowfmt.format(' Returns',
' ExpInc',
' IncTax',
' PayTax',
' LSTax',
' AllTax')
tfile.write(row)
row = rowfmt.format(' (#m)',
' ($b)',
' ($b)',
' ($b)',
' ($b)',
' ($b)')
tfile.write(row)
rowfmt = '{:9.2f}{:10.1f}{:10.1f}{:10.1f}{:10.1f}{:10.1f}\n'
for decile in range(0, 10):
row = f'{decile:2d}'
row += rowfmt.format(rtns_series[decile] * 1e-6,
xinc_series[decile] * 1e-9,
itax_series[decile] * 1e-9,
ptax_series[decile] * 1e-9,
htax_series[decile] * 1e-9,
ctax_series[decile] * 1e-9)
tfile.write(row)
row = ' A'
row += rowfmt.format(rtns_series.sum() * 1e-6,
xinc_series.sum() * 1e-9,
itax_series.sum() * 1e-9,
ptax_series.sum() * 1e-9,
htax_series.sum() * 1e-9,
ctax_series.sum() * 1e-9)
tfile.write(row)
# pylint: enable=consider-using-f-string
del gdfx
del rtns_series
del xinc_series
del itax_series
del ptax_series
del htax_series
del ctax_series
gc.collect()
[docs]
def write_graph_files(self):
"""
Write graphs to HTML files.
All graphs contain same number of filing units in each quantile.
"""
pos_wght_sum = self.calc_ref.total_weight() > 0.0
fig = None
# percentage-aftertax-income-change graph
pch_fname = self.output_filename.replace('.xxx', '-pch.html')
pch_title = 'PCH by Income Percentile'
if pos_wght_sum:
fig = self.calc_bas.pch_graph(self.calc_ref, pop_quantiles=False)
write_graph_file(fig, pch_fname, pch_title)
else:
reason = 'No graph because sum of weights is not positive'
TaxCalcIO.write_empty_graph_file(pch_fname, pch_title, reason)
# average-tax-rate graph
atr_fname = self.output_filename.replace('.xxx', '-atr.html')
atr_title = 'ATR by Income Percentile'
if pos_wght_sum:
fig = self.calc_bas.atr_graph(self.calc_ref, pop_quantiles=False)
write_graph_file(fig, atr_fname, atr_title)
else:
reason = 'No graph because sum of weights is not positive'
TaxCalcIO.write_empty_graph_file(atr_fname, atr_title, reason)
# marginal-tax-rate graph
mtr_fname = self.output_filename.replace('.xxx', '-mtr.html')
mtr_title = 'MTR by Income Percentile'
if pos_wght_sum:
fig = self.calc_bas.mtr_graph(
self.calc_ref,
alt_e00200p_text='Taxpayer Earnings',
pop_quantiles=False
)
write_graph_file(fig, mtr_fname, mtr_title)
else:
reason = 'No graph because sum of weights is not positive'
TaxCalcIO.write_empty_graph_file(mtr_fname, mtr_title, reason)
if fig:
del fig
gc.collect()
if not self.silent:
print( # pragma: no cover
f'Write graphical output to file {pch_fname}\n'
f'Write graphical output to file {atr_fname}\n'
f'Write graphical output to file {mtr_fname}'
)
[docs]
@staticmethod
def write_empty_graph_file(fname, title, reason):
"""
Write HTML graph file with title but no graph for specified reason.
"""
txt = (
'<html>\n'
f'<head><title>{title}</title></head>\n'
f'<body><center<h1>{reason}</h1></center></body>\n'
'</html>\n'
)
with open(fname, 'w', encoding='utf-8') as gfile:
gfile.write(txt)
BASE_DUMPVARS = [
'RECID',
's006',
'data_source',
'XTOT',
'MARS',
'expanded_income',
]
MINIMAL_DUMPVARS = [
'RECID',
'iitax',
]
MTR_DUMPVARS = [
'mtr_itax',
'mtr_ptax',
]
def dump_variables(self, dumpvars_str):
"""
Return list of variable names extracted from dumpvars_str, plus
minimal baseline/reform variables even if not in dumpvars_str.
Also, builds self.errmsg if any specified variables are not valid.
"""
assert isinstance(dumpvars_str, str)
self.errmsg = ''
# change some common non-space delimiter characters into spaces
dumpvars_str = dumpvars_str.replace(',', ' ')
dumpvars_str = dumpvars_str.replace(';', ' ')
dumpvars_str = dumpvars_str.replace('|', ' ')
# split dumpvars_str into a set of dump variables
dumpvars = dumpvars_str.split()
# check that all dumpvars items are valid
recs_vinfo = Records(data=None) # contains records VARINFO only
valid_set = recs_vinfo.USABLE_READ_VARS | recs_vinfo.CALCULATED_VARS
for var in dumpvars:
if var not in valid_set and var not in TaxCalcIO.MTR_DUMPVARS:
msg = f'invalid variable name {var} in DUMPVARS file'
self.errmsg += f'ERROR: {msg}\n'
if self.errmsg:
return []
# construct variable list
dumpvars_list = TaxCalcIO.MINIMAL_DUMPVARS
for var in dumpvars:
if var not in dumpvars_list and var not in TaxCalcIO.BASE_DUMPVARS:
dumpvars_list.append(var)
return dumpvars_list
def write_dumpdb_file(
self,
dump_varlist,
mtr_ptax_ref, mtr_itax_ref,
mtr_ptax_bas, mtr_itax_bas,
):
"""
Write dump output to SQLite database file.
"""
# pylint: disable=too-many-arguments,too-many-positional-arguments
def dump_output(calcx, dumpvars, mtr_itax, mtr_ptax):
"""
Extract dump output from calcx and return it as Pandas DataFrame.
"""
odf = pd.DataFrame()
for var in dumpvars:
if var in TaxCalcIO.MTR_DUMPVARS:
if var == 'mtr_itax':
odf[var] = mtr_itax
elif var == 'mtr_ptax':
odf[var] = mtr_ptax
else:
odf[var] = calcx.array(var)
return odf
# begin main logic
assert isinstance(dump_varlist, list)
assert len(dump_varlist) > 0
db_fname = self.output_filename.replace('.xxx', '.db')
dbcon = sqlite3.connect(db_fname)
# write base table
outdf = pd.DataFrame()
for var in TaxCalcIO.BASE_DUMPVARS:
outdf[var] = self.calc_bas.array(var)
outdf['income_group'] = 0
assert len(outdf.index) == self.calc_bas.array_len
outdf.to_sql('base', dbcon, index=False)
del outdf
# write baseline table
outdf = dump_output(
self.calc_bas, dump_varlist, mtr_itax_bas, mtr_ptax_bas,
)
assert len(outdf.index) == self.calc_bas.array_len
outdf.to_sql('baseline', dbcon, index=False)
del outdf
# write reform table
outdf = dump_output(
self.calc_ref, dump_varlist, mtr_itax_ref, mtr_ptax_ref,
)
assert len(outdf.index) == self.calc_ref.array_len
outdf.to_sql('reform', dbcon, index=False)
del outdf
dbcon.close()
del dbcon
gc.collect()
if not self.silent:
print( # pragma: no cover
f'Write dump output to sqlite3 database file {db_fname}'
)