Coverage for databooks/cli.py: 93%

99 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2022-11-04 16:41 +0000

1"""Main CLI application.""" 

2from itertools import compress 

3from pathlib import Path 

4from typing import List, Optional 

5 

6import tomli 

7from rich.progress import ( 

8 BarColumn, 

9 Progress, 

10 SpinnerColumn, 

11 TextColumn, 

12 TimeElapsedColumn, 

13) 

14from rich.prompt import Confirm 

15from typer import Argument, BadParameter, Context, Exit, Option, Typer, echo 

16 

17from databooks.affirm import affirm_all 

18from databooks.common import expand_paths 

19from databooks.config import TOML_CONFIG_FILE, get_config 

20from databooks.conflicts import conflicts2nbs, path2conflicts 

21from databooks.logging import get_logger 

22from databooks.metadata import clear_all 

23from databooks.recipes import Recipe 

24from databooks.tui import print_nbs 

25from databooks.version import __version__ 

26 

27logger = get_logger(__file__) 

28 

29app = Typer() 

30 

31 

32def _version_callback(show_version: bool) -> None: 

33 """Return application version.""" 

34 if show_version: 

35 echo("databooks version: " + __version__) 

36 raise Exit() 

37 

38 

39def _help_callback(ctx: Context, show_help: Optional[bool]) -> None: 

40 """Reimplement `help` command to execute eagerly.""" 

41 if show_help: 

42 echo(ctx.command.get_help(ctx)) 

43 raise Exit() 

44 

45 

46def _config_callback(ctx: Context, config_path: Optional[Path]) -> Optional[Path]: 

47 """Get config file and inject values into context to override default args.""" 

48 target_paths = expand_paths( 

49 paths=[Path(p).resolve() for p in ctx.params.get("paths", ())] 

50 ) 

51 config_path = ( 

52 get_config( 

53 target_paths=target_paths, 

54 config_filename=TOML_CONFIG_FILE, 

55 ) 

56 if config_path is None and target_paths 

57 else config_path 

58 ) 

59 logger.debug(f"Loading config file from: {config_path}") 

60 

61 if config_path is not None: # config may not be specified 

62 with config_path.open("rb") as f: 

63 conf = ( 

64 tomli.load(f) 

65 .get("tool", {}) 

66 .get("databooks", {}) 

67 .get(ctx.command.name, {}) 

68 ) 

69 # Merge configuration 

70 ctx.default_map = { 

71 **(ctx.default_map or {}), 

72 **{k.replace("-", "_"): v for k, v in conf.items()}, 

73 } 

74 return config_path 

75 

76 

77def _check_paths(paths: List[Path], ignore: List[str]) -> List[Path]: 

78 """Check that notebooks exist retrieve the file paths.""" 

79 if any(path.suffix not in ("", ".ipynb") for path in paths): 

80 raise BadParameter( 

81 "Expected either notebook files, a directory or glob expression." 

82 ) 

83 nb_paths = expand_paths(paths=paths, ignore=ignore) 

84 if not nb_paths: 

85 logger.info(f"No notebooks found in {paths}. Nothing to do.") 

86 raise Exit() 

87 return nb_paths 

88 

89 

90@app.callback() 

91def callback( # noqa: D103 

92 version: Optional[bool] = Option( 

93 None, "--version", callback=_version_callback, is_eager=True 

94 ) 

95) -> None: 

96 """CLI tool to resolve git conflicts and remove metadata in notebooks.""" 

97 

98 

99@app.command(add_help_option=False) 

100def meta( 

101 paths: List[Path] = Argument(..., is_eager=True, help="Path(s) of notebook files"), 

102 ignore: List[str] = Option(["!*"], help="Glob expression(s) of files to ignore"), 

103 prefix: str = Option("", help="Prefix to add to filepath when writing files"), 

104 suffix: str = Option("", help="Suffix to add to filepath when writing files"), 

105 rm_outs: bool = Option(False, help="Whether to remove cell outputs"), 

106 rm_exec: bool = Option(True, help="Whether to remove the cell execution counts"), 

107 nb_meta_keep: List[str] = Option((), help="Notebook metadata fields to keep"), 

108 cell_meta_keep: List[str] = Option((), help="Cells metadata fields to keep"), 

109 cell_fields_keep: List[str] = Option( 

110 (), 

111 help="Other (excluding `execution_counts` and `outputs`) cell fields to keep", 

112 ), 

113 overwrite: bool = Option(False, "--yes", "-y", help="Confirm overwrite of files"), 

114 check: bool = Option( 

115 False, 

116 "--check", 

117 help="Don't write files but check whether there is unwanted metadata", 

118 ), 

119 verbose: bool = Option( 

120 False, "--verbose", "-v", help="Log processed files in console" 

121 ), 

122 config: Optional[Path] = Option( 

123 None, 

124 "--config", 

125 "-c", 

126 is_eager=True, 

127 callback=_config_callback, 

128 resolve_path=True, 

129 exists=True, 

130 help="Get CLI options from configuration file", 

131 ), 

132 help: Optional[bool] = Option( 

133 None, 

134 "--help", 

135 is_eager=True, 

136 callback=_help_callback, 

137 help="Show this message and exit", 

138 ), 

139) -> None: 

140 """Clear both notebook and cell metadata.""" 

141 nb_paths = _check_paths(paths=paths, ignore=ignore) 

142 

143 if not bool(prefix + suffix) and not check: 

144 overwrite = ( 

145 Confirm.ask( 

146 f"{len(nb_paths)} files will be overwritten" 

147 " (no prefix nor suffix was passed). Continue?" 

148 ) 

149 if not overwrite 

150 else overwrite 

151 ) 

152 if not overwrite: 

153 raise Exit() 

154 else: 

155 logger.warning(f"{len(nb_paths)} files will be overwritten") 

156 

157 write_paths = [p.parent / (prefix + p.stem + suffix + p.suffix) for p in nb_paths] 

158 cell_fields_keep = list( 

159 compress(["outputs", "execution_count"], (not v for v in (rm_outs, rm_exec))) 

160 ) + list(cell_fields_keep) 

161 with Progress( 

162 SpinnerColumn(), 

163 TextColumn("[progress.description]{task.description}"), 

164 BarColumn(), 

165 TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), 

166 TimeElapsedColumn(), 

167 ) as progress: 

168 metadata = progress.add_task("[yellow]Removing metadata", total=len(nb_paths)) 

169 

170 are_equal = clear_all( 

171 read_paths=nb_paths, 

172 write_paths=write_paths, 

173 progress_callback=lambda: progress.update(metadata, advance=1), 

174 notebook_metadata_keep=nb_meta_keep, 

175 cell_metadata_keep=cell_meta_keep, 

176 cell_fields_keep=cell_fields_keep, 

177 check=check, 

178 verbose=verbose, 

179 overwrite=overwrite, 

180 ) 

181 if check: 

182 if all(are_equal): 

183 logger.info("No unwanted metadata!") 

184 else: 

185 logger.info( 

186 f"Found unwanted metadata in {sum(not eq for eq in are_equal)} out of" 

187 f" {len(are_equal)} files." 

188 ) 

189 raise Exit(code=1) 

190 else: 

191 logger.info( 

192 f"The metadata of {sum(not eq for eq in are_equal)} out of {len(are_equal)}" 

193 " notebooks were removed!" 

194 ) 

195 

196 

197@app.command("assert", add_help_option=False) 

198def affirm_meta( 

199 paths: List[Path] = Argument(..., is_eager=True, help="Path(s) of notebook files"), 

200 ignore: List[str] = Option(["!*"], help="Glob expression(s) of files to ignore"), 

201 expr: List[str] = Option( 

202 (), "--expr", "-x", help="Expressions to assert on notebooks" 

203 ), 

204 recipe: List[Recipe] = Option( 

205 (), 

206 "--recipe", 

207 "-r", 

208 help="Common recipes of expressions - see" 

209 " https://databooks.dev/latest/usage/overview/#recipes", 

210 ), 

211 verbose: bool = Option( 

212 False, "--verbose", "-v", help="Log processed files in console" 

213 ), 

214 config: Optional[Path] = Option( 

215 None, 

216 "--config", 

217 "-c", 

218 is_eager=True, 

219 callback=_config_callback, 

220 resolve_path=True, 

221 exists=True, 

222 help="Get CLI options from configuration file", 

223 ), 

224 help: Optional[bool] = Option( 

225 None, 

226 "--help", 

227 is_eager=True, 

228 callback=_help_callback, 

229 help="Show this message and exit", 

230 ), 

231) -> None: 

232 """ 

233 Assert notebook metadata has desired values. 

234 

235 Pass one (or multiple) strings or recipes. The available variables in scope include 

236 `nb` (notebook), `raw_cells` (notebook cells of `raw` type), `md_cells` (notebook 

237 cells of `markdown` type), `code_cells` (notebook cells of `code` type) and 

238 `exec_cells` (notebook cells of `code` type that were executed - have an `execution 

239 count` value). Recipes can be found on `databooks.recipes.CookBook`. 

240 """ 

241 nb_paths = _check_paths(paths=paths, ignore=ignore) 

242 exprs = [r.name for r in recipe] + list(expr) 

243 if not exprs: 

244 raise BadParameter("Must specify at least one of `expr` or `recipe`.") 

245 

246 with Progress( 

247 SpinnerColumn(), 

248 TextColumn("[progress.description]{task.description}"), 

249 BarColumn(), 

250 TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), 

251 TimeElapsedColumn(), 

252 ) as progress: 

253 assert_checks = progress.add_task( 

254 "[yellow]Running assert checks", total=len(nb_paths) 

255 ) 

256 

257 are_ok = affirm_all( 

258 nb_paths=nb_paths, 

259 progress_callback=lambda: progress.update(assert_checks, advance=1), 

260 exprs=exprs, 

261 verbose=verbose, 

262 ) 

263 

264 if all(are_ok): 

265 logger.info("All notebooks comply with the desired metadata!") 

266 else: 

267 logger.info( 

268 f"Found issues in notebook metadata for {sum(not ok for ok in are_ok)} out" 

269 f" of {len(are_ok)} notebooks." 

270 ) 

271 raise Exit(code=1) 

272 

273 

274@app.command(add_help_option=False) 

275def fix( 

276 paths: List[Path] = Argument( 

277 ..., is_eager=True, help="Path(s) of notebook files with conflicts" 

278 ), 

279 ignore: List[str] = Option(["!*"], help="Glob expression(s) of files to ignore"), 

280 metadata_head: bool = Option( 

281 True, help="Whether or not to keep the metadata from the head/current notebook" 

282 ), 

283 cells_head: Optional[bool] = Option( 

284 None, 

285 help="Whether to keep the cells from the head/base notebook. Omit to keep both", 

286 ), 

287 cell_fields_ignore: List[str] = Option( 

288 [ 

289 "id", 

290 "execution_count", 

291 ], 

292 help="Cell fields to remove before comparing cells", 

293 ), 

294 interactive: bool = Option( 

295 False, 

296 "--interactive", 

297 "-i", 

298 help="Interactively resolve the conflicts (not implemented)", 

299 ), 

300 verbose: bool = Option( 

301 False, "--verbose", "-v", help="Log processed files in console" 

302 ), 

303 config: Optional[Path] = Option( 

304 None, 

305 "--config", 

306 "-c", 

307 is_eager=True, 

308 callback=_config_callback, 

309 resolve_path=True, 

310 exists=True, 

311 help="Get CLI options from configuration file", 

312 ), 

313 help: Optional[bool] = Option( 

314 None, 

315 "--help", 

316 is_eager=True, 

317 callback=_help_callback, 

318 help="Show this message and exit", 

319 ), 

320) -> None: 

321 """ 

322 Fix git conflicts for notebooks. 

323 

324 Perform by getting the unmerged blobs from git index, comparing them and returning 

325 a valid notebook summarizing the differences - see 

326 [git docs](https://git-scm.com/docs/git-ls-files). 

327 """ 

328 filepaths = expand_paths(paths=paths, ignore=ignore) 

329 conflict_files = path2conflicts(nb_paths=filepaths) 

330 if not conflict_files: 

331 raise BadParameter( 

332 f"No conflicts found at {', '.join([str(p) for p in filepaths])}." 

333 ) 

334 if interactive: 

335 raise NotImplementedError 

336 

337 with Progress( 

338 SpinnerColumn(), 

339 TextColumn("[progress.description]{task.description}"), 

340 BarColumn(), 

341 TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), 

342 TimeElapsedColumn(), 

343 ) as progress: 

344 conflicts = progress.add_task( 

345 "[yellow]Resolving conflicts", total=len(conflict_files) 

346 ) 

347 conflicts2nbs( 

348 conflict_files=conflict_files, 

349 meta_first=metadata_head, 

350 cells_first=cells_head, 

351 cell_fields_ignore=cell_fields_ignore, 

352 verbose=verbose, 

353 progress_callback=lambda: progress.update(conflicts, advance=1), 

354 ) 

355 logger.info(f"Resolved the conflicts of {len(conflict_files)}!") 

356 

357 

358@app.command(add_help_option=False) 

359def show( 

360 paths: List[Path] = Argument( 

361 ..., is_eager=True, help="Path(s) of notebook files with conflicts" 

362 ), 

363 ignore: List[str] = Option(["!*"], help="Glob expression(s) of files to ignore"), 

364 pager: bool = Option( 

365 False, "--pager", "-p", help="Use pager instead of printing to terminal" 

366 ), 

367 verbose: bool = Option( 

368 False, "--verbose", "-v", help="Increase verbosity for debugging" 

369 ), 

370 multiple: bool = Option(False, "--yes", "-y", help="Show multiple files"), 

371 config: Optional[Path] = Option( 

372 None, 

373 "--config", 

374 "-c", 

375 is_eager=True, 

376 callback=_config_callback, 

377 resolve_path=True, 

378 exists=True, 

379 help="Get CLI options from configuration file", 

380 ), 

381 help: Optional[bool] = Option( 

382 None, 

383 "--help", 

384 is_eager=True, 

385 callback=_help_callback, 

386 help="Show this message and exit", 

387 ), 

388) -> None: 

389 """Show rich representation of notebook.""" 

390 nb_paths = _check_paths(paths=paths, ignore=ignore) 

391 if len(nb_paths) > 1 and not multiple: 

392 if not Confirm.ask(f"Show {len(nb_paths)} notebooks?"): 

393 raise Exit() 

394 

395 print_nbs(nb_paths, use_pager=pager) 

396 

397 

398@app.command() 

399def diff() -> None: 

400 """Show differences between notebooks (not implemented).""" 

401 raise NotImplementedError