Source code for taxcalc.taxcalcio

"""
Tax-Calculator Input-Output class.
"""
# CODING-STYLE CHECKS:
# pycodestyle taxcalcio.py
# pylint --disable=locally-disabled taxcalcio.py
# pylint: disable=too-many-lines
import os
import gc
import copy
import sqlite3
from pathlib import Path
import numpy as np
import pandas as pd
import paramtools
import behresp
from taxcalc.policy import Policy
from taxcalc.records import Records
from taxcalc.consumption import Consumption
from taxcalc.growdiff import GrowDiff
from taxcalc.growfactors import GrowFactors
from taxcalc.calculator import Calculator
from taxcalc.utils import (json_to_dict, delete_file, write_graph_file,
                           add_quantile_table_row_variable,
                           unweighted_sum, weighted_sum)

TMD_ASSUMES_FULL_CREDIT_CLAIMING = True
TMD_CREDIT_CLAIMING = {
    # IMPORTANT NOTE: when changing either TMDCSV_YEAR or _claim_thd value(s),
    # be sure to update changed info in the Tax-Calculator-LLM CLAUDE.md file.
    'eitc_claim_thd': {f'{Records.TMDCSV_YEAR}': 1600},
    'actc_claim_thd': {f'{Records.TMDCSV_YEAR}': 1500},
}


[docs] class TaxCalcIO(): """ Constructor for the Tax-Calculator Input-Output class. TaxCalcIO class constructor call must be followed by init() call. Parameters ---------- input_data: string or Pandas DataFrame string is name of INPUT file that is CSV formatted containing variable names in the Records USABLE_READ_VARS set, or Pandas DataFrame is INPUT data containing variable names in the Records USABLE_READ_VARS set. INPUT vsrisbles not in the Records USABLE_READ_VARS set can be present but are ignored. tax_year: integer calendar year for which taxes will be computed for INPUT. baseline: None or string None implies baseline policy is current-law policy, or string is name of optional BASELINE file that is a JSON reform file. reform: None or string None implies no policy reform (current-law policy), or string is name of optional REFORM file(s). assump: None or string None implies economic assumptions are standard assumptions, or string is name of optional ASSUMP file. behavior: None or string None implies behavioral response elasticities are all zero, or string is name of optional BEHAVIOR file. runid: int run id value to use for simpler output file names silent: boolean whether or not to suppress action messages. Returns ------- class instance: TaxCalcIO """ # pylint: disable=too-many-instance-attributes def __init__(self, input_data, tax_year, baseline, reform, assump, behavior, runid=0, silent=True): # pylint: disable=too-many-arguments,too-many-positional-arguments # pylint: disable=too-many-branches,too-many-statements,too-many-locals self.silent = silent self.gf_reform = None self.errmsg = '' # check name and existence of INPUT file inp = 'x' self.cps_input_data = False self.puf_input_data = False self.tmd_input_data = False if isinstance(input_data, str): # remove any leading directory path from INPUT filename fname = os.path.basename(input_data) # check if fname ends with ".csv" if fname.endswith('.csv'): inp = f'{fname[:-4]}-{str(tax_year)[2:]}' else: msg = 'INPUT file name does not end in .csv' self.errmsg += f'ERROR: {msg}\n' # check existence of INPUT file self.cps_input_data = input_data.endswith('cps.csv') self.puf_input_data = input_data.endswith('puf.csv') self.tmd_input_data = input_data.endswith('tmd.csv') if not self.cps_input_data and not os.path.isfile(input_data): msg = 'INPUT file could not be found' self.errmsg += f'ERROR: {msg}\n' # if puf_input_data is True, construct weights and ratios paths if self.puf_input_data: # pragma: no cover puf_dir = os.path.dirname(input_data) self.puf_weights = os.path.join(puf_dir, 'puf_weights.csv.gz') self.puf_ratios = os.path.join(puf_dir, 'puf_ratios.csv') if not os.path.isfile(self.puf_weights): msg = f'weights file {self.puf_weights} could not be found' self.errmsg += f'ERROR: {msg}\n' if not os.path.isfile(self.puf_ratios): msg = f'gfactor file {self.puf_ratios} could not be found' self.errmsg += f'ERROR: {msg}\n' # if tmd_input_data is True, construct weights and gfactor paths if self.tmd_input_data: # pragma: no cover tmd_dir = os.path.dirname(input_data) if 'TMD_AREA' in os.environ: area = os.environ['TMD_AREA'] wfile = f'{area}_tmd_weights.csv.gz' inp = f'{fname[:-4]}_{area}-{str(tax_year)[2:]}' else: # using national weights wfile = 'tmd_weights.csv.gz' self.tmd_weights = os.path.join(tmd_dir, wfile) self.tmd_gfactor = os.path.join(tmd_dir, 'tmd_growfactors.csv') if not os.path.isfile(self.tmd_weights): msg = f'weights file {self.tmd_weights} could not be found' self.errmsg += f'ERROR: {msg}\n' if not os.path.isfile(self.tmd_gfactor): msg = f'gfactor file {self.tmd_gfactor} could not be found' self.errmsg += f'ERROR: {msg}\n' elif isinstance(input_data, pd.DataFrame): inp = f'df-{str(tax_year)[2:]}' else: msg = 'INPUT is neither string nor Pandas DataFrame' self.errmsg += f'ERROR: {msg}\n' # check name(s) and existence of BASELINE file(s) if baseline is None: self.specified_baseline = False bas = '-#' elif isinstance(baseline, str): self.specified_baseline = True bas = self._check_policy_files(baseline, 'BASELINE') else: bas = '-x' msg = 'TaxCalcIO.ctor: baseline is neither None nor str' self.errmsg += f'ERROR: {msg}\n' # check name(s) and existence of REFORM file(s) if reform is None: self.specified_reform = False ref = '-#' elif isinstance(reform, str): self.specified_reform = True ref = self._check_policy_files(reform, 'REFORM') else: ref = '-x' msg = 'TaxCalcIO.ctor: reform is neither None nor str' self.errmsg += f'ERROR: {msg}\n' # check name and existence of ASSUMP file if assump is None: asm = '-#' elif isinstance(assump, str): asm = self._check_single_json_file(assump, 'ASSUMP') else: asm = '-x' msg = 'TaxCalcIO.ctor: assump is neither None nor str' self.errmsg += f'ERROR: {msg}\n' # check name and existence of BEHAVIOR file self.behvdict = None if behavior is None: beh = '-#' elif isinstance(behavior, str): beh = self._check_single_json_file(behavior, 'BEHAVIOR') else: beh = '-x' msg = 'TaxCalcIO.ctor: behavior is neither None nor str' self.errmsg += f'ERROR: {msg}\n' # create OUTPUT file name and delete any existing output files self.output_filename = f'{inp}{bas}{ref}{asm}{beh}.xxx' self.runid = runid if runid > 0: self.output_filename = f'run{runid}-{str(tax_year)[2:]}.xxx' self.delete_output_files() # initialize variables whose values are set in init method self.pol_ref = None self.pol_bas = None self.recs_ref = None self.recs_bas = None self.con = None self.aging_input_data = None self.calc_ref = None self.calc_bas = None def delete_output_files(self): """ Delete all output files derived from self.output_filename. """ extensions = [ '-params.baseline', '-params.reform', '.tables', '-atr.html', '-mtr.html', '-chg.html', '.dumpdb', ] for ext in extensions: delete_file(self.output_filename.replace('.xxx', ext)) def init(self, input_data, tax_year, baseline, reform, assump, behavior, exact_calculations): """ TaxCalcIO class post-constructor method that completes initialization. Parameters ---------- First six are same as the first six of the TaxCalcIO constructor: input_data, tax_year, baseline, reform, assump, behavior. exact_calculations: boolean specifies whether or not exact tax calculations are done without any smoothing of "stair-step" provisions in the tax law. """ # pylint: disable=too-many-arguments,too-many-positional-arguments # pylint: disable=too-many-statements,too-many-branches,too-many-locals self.errmsg = '' # instantiate base/reform GrowFactors objects used for param indexing policy_gfactors_bas = GrowFactors() policy_gfactors_ref = GrowFactors() # instantiate base/reform GrowFactors objects used to extrapolate data if self.tmd_input_data: gfactors_bas = GrowFactors(self.tmd_gfactor) # pragma: no cover gfactors_ref = GrowFactors(self.tmd_gfactor) # pragma: no cover else: gfactors_bas = GrowFactors() gfactors_ref = GrowFactors() # check tax_year validity max_tax_year = gfactors_bas.last_year if tax_year > max_tax_year: msg = f'TAXYEAR={tax_year} is greater than {max_tax_year}' self.errmsg += f'ERROR: {msg}\n' if self.puf_input_data: min_tax_year = max( # pragma: no cover Policy.JSON_START_YEAR, Records.PUFCSV_YEAR) elif self.cps_input_data: min_tax_year = max( Policy.JSON_START_YEAR, Records.CPSCSV_YEAR) elif self.tmd_input_data: min_tax_year = max( # pragma: no cover Policy.JSON_START_YEAR, Records.TMDCSV_YEAR) else: min_tax_year = Policy.JSON_START_YEAR if tax_year < min_tax_year: msg = f'TAXYEAR={tax_year} is less than {min_tax_year}' self.errmsg += f'ERROR: {msg}\n' # tax_year out of valid range means cannot proceed with calculations if self.errmsg: return # get assumption sub-dictionaries assumpdict = Calculator.read_json_param_objects(None, assump) # get behavior dictionary if behavior: with open(behavior, 'r', encoding='utf-8') as jfile: json_text = jfile.read() try: self.behvdict = json_to_dict(json_text) except ValueError as valerr: # pragma: no cover msg = f'{behavior} contains invalid JSON' self.errmsg = f'ERROR: BEHAVIOR file {msg}\n' self.errmsg += f'{valerr}' return # check behavior response elasticity names and values elasticity_set = set(self.behvdict.keys()) if elasticity_set != set(['sub', 'inc', 'cg']): msg = f'{behavior} contains extra or missing elasticities' self.errmsg = f'ERROR: BEHAVIOR file {msg}\n' self.errmsg += 'Valid elasticities are "sub", "inc", "cg"' return if self.behvdict['sub'] < 0.0: msg = f'{behavior} contains negative "sub" elasticity' self.errmsg = f'ERROR: BEHAVIOR file {msg}\n' if self.behvdict['inc'] > 0.0: msg = f'{behavior} contains positive "inc" elasticity' self.errmsg += f'ERROR: BEHAVIOR file {msg}\n' if self.behvdict['cg'] > 0.0: msg = f'{behavior} contains positive "cg" elasticity' self.errmsg += f'ERROR: BEHAVIOR file {msg}\n' if self.errmsg: return # get policy parameter dictionaries from --baseline file(s) poldicts_bas = [] if self.specified_baseline: for bas in baseline.split('+'): pdict = Calculator.read_json_param_objects(bas, None) poldicts_bas.append(pdict['policy']) # get policy parameter dictionaries from --reform file(s) poldicts_ref = [] if self.specified_reform: for ref in reform.split('+'): pdict = Calculator.read_json_param_objects(ref, None) poldicts_ref.append(pdict['policy']) # set last_b_year last_b_year = max(tax_year, Policy.LAST_BUDGET_YEAR) # create gdiff_baseline object gdiff_baseline = GrowDiff(last_budget_year=last_b_year) try: gdiff_baseline.update_growdiff(assumpdict['growdiff_baseline']) except paramtools.ValidationError as valerr_msg: self.errmsg += str(valerr_msg) # apply gdiff_baseline to gfactor_bas gdiff_baseline.apply_to(gfactors_bas) # specify gdiff_response object gdiff_response = GrowDiff(last_budget_year=last_b_year) try: gdiff_response.update_growdiff(assumpdict['growdiff_response']) except paramtools.ValidationError as valerr_msg: self.errmsg += str(valerr_msg) # apply gdiff_baseline and gdiff_response to gfactor_ref gdiff_baseline.apply_to(gfactors_ref) gdiff_response.apply_to(gfactors_ref) self.gf_reform = copy.deepcopy(gfactors_ref) # create Policy objects: # ... apply gdiff_baseline to policy_gfactor_baseline gdiff_baseline.apply_to(policy_gfactors_bas) # ... apply gdiff_baseline and gdiff_response to policy_gfactor_ref gdiff_baseline.apply_to(policy_gfactors_ref) gdiff_response.apply_to(policy_gfactors_ref) # ... the baseline Policy object self.pol_bas = self._make_policy(policy_gfactors_bas, last_b_year) if self.specified_baseline: self._apply_poldicts(self.pol_bas, poldicts_bas) # ... the reform Policy object (no reform implies reform == baseline) if self.specified_reform: self.pol_ref = self._make_policy(policy_gfactors_ref, last_b_year) self._apply_poldicts(self.pol_ref, poldicts_ref) else: self.pol_ref = self._make_policy(policy_gfactors_bas, last_b_year) # create Consumption object self.con = Consumption(last_budget_year=last_b_year) try: self.con.update_consumption(assumpdict['consumption']) except paramtools.ValidationError as valerr_msg: self.errmsg += str(valerr_msg) # any errors imply cannot proceed with calculations if self.errmsg: return # set policy to tax_year self.pol_ref.set_year(tax_year) self.pol_bas.set_year(tax_year) # read input file contents into Records objects self.aging_input_data = ( self.cps_input_data or self.puf_input_data or self.tmd_input_data ) if self.aging_input_data: self.recs_ref = self._make_records( gfactors_ref, input_data, tax_year, exact_calculations, ) self.recs_bas = self._make_records( gfactors_bas, input_data, tax_year, exact_calculations, ) # extrapolate input data to tax_year while self.recs_ref.current_year < tax_year: self.recs_ref.increment_year() self.recs_bas.increment_year() else: # input_data are raw data that are not being aged self.recs_ref = self._make_records( None, input_data, tax_year, exact_calculations, ) self.recs_bas = copy.deepcopy(self.recs_ref) # create Calculator objects self.calc_ref = self._make_calculator( self.pol_ref, self.recs_ref, not self.silent, ) self.calc_bas = self._make_calculator( self.pol_bas, self.recs_bas, False, )
[docs] def tax_year(self): """ Return calendar year for which TaxCalcIO calculations are being done. """ return self.calc_ref.current_year
[docs] def output_filepath(self): """ Return full path to output file named in TaxCalcIO constructor. """ dirpath = os.path.abspath(os.path.dirname(__file__)) return os.path.join(dirpath, self.output_filename)
def advance_to_year(self, year): """ Update self.output_filename and create Calculator objects for year. """ # update self.output_filename and delete output files parts = self.output_filename.split('-') if self.runid == 0: # if using legacy output file names parts[1] = str(year)[2:] else: # if using simpler output file names (runN-YY.xxx) subparts = parts[1].split('.') subparts[0] = str(year)[2:] parts[1] = '.'.join(subparts) self.output_filename = '-'.join(parts) self.delete_output_files() # create baseline and reform Calculator objects for specified year # ... set policy for year self.pol_ref.set_year(year) self.pol_bas.set_year(year) # ... set consumption for year self.con.set_year(year) # ... increment records to year self.recs_ref.increment_year() self.recs_bas.increment_year() # ... delete old and create new Calculator objects del self.calc_ref self.calc_ref = self._make_calculator( self.pol_ref, self.recs_ref, False, ) del self.calc_bas self.calc_bas = self._make_calculator( self.pol_bas, self.recs_bas, False, ) # report advance to new year aging_data = ( self.cps_input_data or self.puf_input_data or self.tmd_input_data ) idata = 'Advance input data and' if aging_data else 'Advance' if not self.silent: print(f'{idata} policy to {year}')
[docs] def analyze( self, output_params=False, output_tables=False, output_graphs=False, output_dump=False, dump_varlist=None, ): """ Conduct tax analysis. Parameters ---------- output_params: boolean whether or not to write baseline and reform policy parameter values to separate text files output_tables: boolean whether or not to generate and write distributional tables to a text file output_graphs: boolean whether or not to generate and write HTML graphs of average and marginal tax rates by income percentile output_dump: boolean whether or not to write SQLite3 database with baseline and reform tables each containing the variables in dump_varlist. dump_varlist: list list of variables to include in dumpdb output; list must include at least one variable. Returns ------- Nothing """ # pylint: disable=too-many-arguments,too-many-positional-arguments # pylint: disable=too-many-branches,too-many-locals doing_calcs = output_tables or output_graphs or output_dump # optionally write --params output to text files if output_params: self.write_policy_params_files() if not doing_calcs: return # do output calculations if self.behvdict: # if assuming behavioral responses br_dump_bas, br_dump_ref = behresp.response( self.calc_bas, self.calc_ref, self.behvdict, dump=True, ) # copy returned dump dataframe values back into calc objects self._copy_dump_into_calc(self.calc_bas, br_dump_bas) del br_dump_bas self._copy_dump_into_calc(self.calc_ref, br_dump_ref) del br_dump_ref else: # if assuming no behavioral responses self.calc_bas.calc_all() self.calc_ref.calc_all() # handle MTR output variables mtr_ptax_bas = None mtr_itax_bas = None mtr_ptax_ref = None mtr_itax_ref = None if output_dump: assert isinstance(dump_varlist, list) assert len(dump_varlist) > 0 mtr_output = ( 'mtr_itax' in dump_varlist or 'mtr_ptax' in dump_varlist ) if mtr_output: (mtr_ptax_bas, mtr_itax_bas, _) = self.calc_bas.mtr( wrt_full_compensation=False, calc_all_already_called=True) (mtr_ptax_ref, mtr_itax_ref, _) = self.calc_ref.mtr( wrt_full_compensation=False, calc_all_already_called=True) # optionally write --tables output to text file if output_tables: self._write_tables_file() # optionally write --graphs output to HTML files if output_graphs: self._write_graph_files() # optionally write --dumpdb output to SQLite database file if output_dump: self._write_dumpdb_file( dump_varlist, mtr_ptax_ref, mtr_itax_ref, mtr_ptax_bas, mtr_itax_bas, )
def write_policy_params_files(self): """ Write baseline and reform policy parameter values to separate files. """ self._write_params(self.calc_bas, '-params.baseline', 'baseline') self._write_params(self.calc_ref, '-params.reform', 'reform') BASE_DUMPVARS = [ 'RECID', 's006', 'data_source', 'XTOT', 'MARS', 'expanded_income', ] MINIMAL_DUMPVARS = [ 'RECID', 'iitax', ] MTR_DUMPVARS = [ 'mtr_itax', 'mtr_ptax', ] def dump_variables(self, dumpvars_str): """ Return list of variable names extracted from dumpvars_str, plus minimal baseline/reform variables even if not in dumpvars_str. Also, builds self.errmsg if any specified variables are not valid. """ assert isinstance(dumpvars_str, str) self.errmsg = '' # get read and calc Records variables recs_vinfo = Records(data=None) # contains records VARINFO only valid_set = recs_vinfo.USABLE_READ_VARS | recs_vinfo.CALCULATED_VARS # construct dumpvars list if dumpvars_str == 'ALL': dumpvars = list(valid_set) + TaxCalcIO.MTR_DUMPVARS else: # ... change some common non-space delimiter characters into spaces dumpvars_str = dumpvars_str.replace(',', ' ') dumpvars_str = dumpvars_str.replace(';', ' ') dumpvars_str = dumpvars_str.replace('|', ' ') # ... split dumpvars_str into dumpvars list dumpvars = dumpvars_str.split() # ... check that all dumpvars items are valid for var in dumpvars: if var not in valid_set and var not in TaxCalcIO.MTR_DUMPVARS: msg = f'invalid variable name {var} in DUMPVARS file' self.errmsg += f'ERROR: {msg}\n' if self.errmsg: return [] # construct variable list dumpvars_list = list(TaxCalcIO.MINIMAL_DUMPVARS) for var in dumpvars: if var not in dumpvars_list and var not in TaxCalcIO.BASE_DUMPVARS: dumpvars_list.append(var) return dumpvars_list # --- Begin private methods of the TaxCalcIO class --- # def _check_policy_files(self, filespec, label): """ Check validity of the (possibly compound) BASELINE or REFORM filespec, appending any errors to self.errmsg, and return the name fragment used in constructing output file names. """ names = [] for path in filespec.split('+'): # remove any leading directory path from filename fname = os.path.basename(path) # check if fname ends with ".json" if not fname.endswith('.json'): self.errmsg += ( f'ERROR: {label} file name {fname} does not end in .json\n' ) # check existence of file if os.path.isfile(path): # check validity of JSON text with open(path, 'r', encoding='utf-8') as jfile: json_text = jfile.read() try: _ = json_to_dict(json_text) except ValueError as valerr: # pragma: no cover msg = f'{path} contains invalid JSON' self.errmsg += f'ERROR: {label} file {msg}\n' self.errmsg += f'{valerr}' else: msg = f'{path} could not be found' self.errmsg += f'ERROR: {label} file {msg}\n' # add fname to list of names used in output file names names.append(fname) # return (possibly compound) name fragment for output file names return '-' + '+'.join(name[:-5] for name in names) def _check_single_json_file(self, path, label): """ Check name and existence of the single ASSUMP or BEHAVIOR file, appending any errors to self.errmsg, and return the name fragment used in constructing output file names. """ # remove any leading directory path from filename fname = os.path.basename(path) # check if fname ends with ".json" if fname.endswith('.json'): fragment = f'-{fname[:-5]}' else: fragment = '-x' self.errmsg += f'ERROR: {label} file name does not end in .json\n' # check existence of file if not os.path.isfile(path): self.errmsg += f'ERROR: {label} file could not be found\n' return fragment def _make_policy(self, policy_gfactors, last_b_year): """ Return Policy object that uses the specified growfactors and that optionally implements TMD credit-claiming thresholds. """ pol = Policy( gfactors=policy_gfactors, last_budget_year=last_b_year, ) if self.tmd_input_data: # pragma: no cover if not TMD_ASSUMES_FULL_CREDIT_CLAIMING: pol.implement_reform(TMD_CREDIT_CLAIMING) return pol def _apply_poldicts(self, pol, poldicts): """ Implement each reform dict in poldicts on the pol Policy object, appending any parameter errors to self.errmsg. """ for poldict in poldicts: try: pol.implement_reform( poldict, print_warnings=True, raise_errors=False, ) if self.errmsg: self.errmsg += '\n' for _, errors in pol.parameter_errors.items(): self.errmsg += '\n'.join(errors) except paramtools.ValidationError as valerr_msg: self.errmsg += str(valerr_msg) def _make_records(self, gfactors, input_data, tax_year, exact_calculations): """ Construct and return a Records object using the specified gfactors and the input data type implied by the constructor arguments. """ # pylint: disable=too-many-arguments,too-many-positional-arguments if self.cps_input_data: return Records.cps_constructor( gfactors=gfactors, exact_calculations=exact_calculations, ) if self.puf_input_data: # pragma: no cover return Records.puf_constructor( data=input_data, gfactors=gfactors, weights=self.puf_weights, ratios=self.puf_ratios, exact_calculations=exact_calculations, ) if self.tmd_input_data: # pragma: no cover return Records.tmd_constructor( data_path=Path(input_data), weights_path=Path(self.tmd_weights), growfactors=gfactors, exact_calculations=exact_calculations, ) # input_data are raw data that are not being aged return Records( data=input_data, start_year=tax_year, gfactors=None, weights=None, adjust_ratios=None, exact_calculations=exact_calculations, ) def _make_calculator(self, policy, records, verbose): """ Construct and return a Calculator object from the specified policy and records objects. """ return Calculator( policy=policy, records=records, verbose=verbose, consumption=self.con, sync_years=self.aging_input_data, ) def _copy_dump_into_calc(self, calc, br_dump): """ Copy behavioral-response dump DataFrame values back into the calc object, skipping the marginal-tax-rate columns. """ int_variables = self.recs_bas.INTEGER_VARS vnames = list(br_dump.columns) for mtr_vname in ['mtr_ptax', 'mtr_itax', 'mtr_combined']: if mtr_vname in vnames: vnames.remove(mtr_vname) for vname in vnames: if vname in int_variables: vdtype = np.int32 else: vdtype = np.float64 calc.array( vname, br_dump[vname].to_numpy(dtype=vdtype, copy=True) ) def _write_params(self, calc, ext, label): """ Write policy parameter values from calc to the ext output file. """ year = calc.current_year fname = self.output_filename.replace('.xxx', ext) with open(fname, 'w', encoding='utf-8') as pfile: for pname in Policy.parameter_list(): pval = calc.policy_param(pname) pfile.write(f'{year} {pname} {pval}\n') if not self.silent: print( # pragma: no cover f'Write {label} policy parameter values to file {fname}' ) def _write_tables_file(self): """ Write tables to text file. """ # pylint: disable=too-many-locals tab_fname = self.output_filename.replace('.xxx', '.tables') # skip tables if there are not some positive weights if self.calc_bas.total_weight() <= 0.: with open(tab_fname, 'w', encoding='utf-8') as tfile: msg = 'No tables because sum of weights is not positive\n' tfile.write(msg) return # create list of results for nontax variables # - weights don't change with reform # - expanded_income may change, so always use baseline expanded income nontax_vars = ['s006', 'expanded_income'] nontax = [self.calc_bas.array(var) for var in nontax_vars] # create list of results for tax variables from reform Calculator tax_vars = ['iitax', 'payrolltax', 'lumpsum_tax', 'combined'] reform = [self.calc_ref.array(var) for var in tax_vars] # create DataFrame with tax distribution under reform dist = nontax + reform # using expanded_income under baseline policy all_vars = nontax_vars + tax_vars distdf = pd.DataFrame(data=np.column_stack(dist), columns=all_vars) # create DataFrame with tax differences (reform - baseline) base = [self.calc_bas.array(var) for var in tax_vars] change = [(reform[idx] - base[idx]) for idx in range(0, len(tax_vars))] diff = nontax + change # using expanded_income under baseline policy diffdf = pd.DataFrame(data=np.column_stack(diff), columns=all_vars) # write each kind of distributional table year = self.calc_bas.current_year with open(tab_fname, 'w', encoding='utf-8') as tfile: TaxCalcIO._write_decile_table( distdf, tfile, year, tkind='Reform Totals', ) tfile.write('\n') TaxCalcIO._write_decile_table( diffdf, tfile, year, tkind='Differences', ) # delete intermediate DataFrame objects del distdf del diffdf gc.collect() if not self.silent: print( # pragma: no cover f'Write tabular output to file {tab_fname}' ) @staticmethod def _write_decile_table(dfx, tfile, year, tkind='Totals'): """ Write to tfile the tkind decile table using dfx DataFrame. """ dfx = add_quantile_table_row_variable(dfx, 'expanded_income', 10, decile_details=False, pop_quantiles=False, weight_by_income_measure=False) # each table column: variable, aggregator, scale, header, units wsum = weighted_sum usum = unweighted_sum columns = [ ('s006', usum, 1e-6, ' Returns', ' (#m)'), ('expanded_income', wsum, 1e-9, ' ExpInc', ' ($b)'), ('iitax', wsum, 1e-9, ' IncTax', ' ($b)'), ('payrolltax', wsum, 1e-9, ' PayTax', ' ($b)'), ('lumpsum_tax', wsum, 1e-9, ' LSTax', ' ($b)'), ('combined', wsum, 1e-9, ' AllTax', ' ($b)'), ] gdfx = dfx.groupby('table_row', as_index=False, observed=True) series = [ gdfx.apply(agg, var, include_groups=False).values[:, 1] for var, agg, _, _, _ in columns ] scales = [scale for _, _, scale, _, _ in columns] # write decile table to text file row = ( f'Weighted Tax {tkind} by ' f'Baseline Expanded-Income Decile for {year}\n' ) tfile.write(row) # pylint: disable=consider-using-f-string rowfmt = '{}{}{}{}{}{}\n' tfile.write(rowfmt.format(*[header for _, _, _, header, _ in columns])) tfile.write(rowfmt.format(*[units for _, _, _, _, units in columns])) rowfmt = '{:9.2f}{:10.1f}{:10.1f}{:10.1f}{:10.1f}{:10.1f}\n' for decile in range(0, 10): row = f'{decile:2d}' row += rowfmt.format(*[ ser[decile] * scl for ser, scl in zip(series, scales) ]) tfile.write(row) row = ' A' row += rowfmt.format(*[ ser.sum() * scl for ser, scl in zip(series, scales) ]) tfile.write(row) # pylint: enable=consider-using-f-string del gdfx del series gc.collect() def _write_graph_files(self): """ Write graphs to HTML files. All graphs contain same number of filing units in each quantile. """ pos_wght_sum = self.calc_ref.total_weight() > 0.0 # each graph is specified by output-file suffix, title, and builder graph_specs = [ ('-chg.html', 'CHG by Income Percentile', lambda: self.calc_bas.pch_graph( self.calc_ref, pop_quantiles=False)), ('-atr.html', 'ATR by Income Percentile', lambda: self.calc_bas.atr_graph( self.calc_ref, pop_quantiles=False)), ('-mtr.html', 'MTR by Income Percentile', lambda: self.calc_bas.mtr_graph( self.calc_ref, alt_e00200p_text='Taxpayer Earnings', pop_quantiles=False)), ] fnames = [] for suffix, title, build_graph in graph_specs: fname = self.output_filename.replace('.xxx', suffix) fnames.append(fname) if pos_wght_sum: fig = build_graph() write_graph_file(fig, fname, title) del fig gc.collect() else: reason = 'No graph because sum of weights is not positive' TaxCalcIO._write_empty_graph_file(fname, title, reason) if not self.silent: print( # pragma: no cover f'Write graphical output to file {fnames[0]}\n' f'Write graphical output to file {fnames[1]}\n' f'Write graphical output to file {fnames[2]}' ) @staticmethod def _write_empty_graph_file(fname, title, reason): """ Write HTML graph file with title but no graph for specified reason. """ txt = ( '<html>\n' f'<head><title>{title}</title></head>\n' f'<body><center<h1>{reason}</h1></center></body>\n' '</html>\n' ) with open(fname, 'w', encoding='utf-8') as gfile: gfile.write(txt) def _write_dumpdb_file( self, dump_varlist, mtr_ptax_ref, mtr_itax_ref, mtr_ptax_bas, mtr_itax_bas, ): """ Write dump output to SQLite database file. """ # pylint: disable=too-many-arguments,too-many-positional-arguments def _dump_output(calcx, dumpvars, mtr_itax, mtr_ptax): """ Extract dump output from calcx and return it as Pandas DataFrame. """ odict = {} for var in dumpvars: if var == 'mtr_itax': odict[var] = pd.Series(mtr_itax) elif var == 'mtr_ptax': odict[var] = pd.Series(mtr_ptax) else: odict[var] = pd.Series(calcx.array(var)) odf = pd.concat(odict, axis=1) del odict return odf # begin main logic assert isinstance(dump_varlist, list) assert len(dump_varlist) > 0 db_fname = self.output_filename.replace('.xxx', '.dumpdb') dbcon = sqlite3.connect(db_fname) # write base table outdf = pd.DataFrame() for var in TaxCalcIO.BASE_DUMPVARS: outdf[var] = self.calc_bas.array(var) expanded_income_bin_edges = [ # default income_group definition -1e300, # essentially -infinity 50e3, 100e3, 250e3, 500e3, 1e6, 1e300, # essentially +infinity ] outdf['income_group'] = 1 + pd.cut( # default base.income_group values outdf['expanded_income'], expanded_income_bin_edges, right=False, # bins are defined as [lo_edge, hi_edge) labels=False, # pd.cut returns bins numbered 0,1,2,... ) assert len(outdf.index) == self.calc_bas.array_len outdf.to_sql('base', dbcon, index=False) del outdf # write income_group_definition table num_groups = len(expanded_income_bin_edges) - 1 outdf = pd.DataFrame() outdf['income_group'] = np.array([ 1 + grp for grp in range(0, num_groups) ]) outdf['income_lower'] = np.array(expanded_income_bin_edges[:-1]) outdf['income_up_to'] = np.array(expanded_income_bin_edges[1:]) assert len(outdf.index) == num_groups outdf.to_sql('income_group_definition', dbcon, index=False) del outdf # write baseline table outdf = _dump_output( self.calc_bas, dump_varlist, mtr_itax_bas, mtr_ptax_bas, ) assert len(outdf.index) == self.calc_bas.array_len outdf.to_sql('baseline', dbcon, index=False) del outdf # write reform table outdf = _dump_output( self.calc_ref, dump_varlist, mtr_itax_ref, mtr_ptax_ref, ) assert len(outdf.index) == self.calc_ref.array_len outdf.to_sql('reform', dbcon, index=False) del outdf dbcon.close() del dbcon gc.collect() if not self.silent: print( # pragma: no cover f'Write dump output to sqlite3 database file {db_fname}' )