tests for code context extractor
This commit is contained in:
parent
cf86cad999
commit
7cda6aafa7
2 changed files with 413 additions and 2 deletions
|
|
@ -1311,6 +1311,409 @@ def test_direct_module_import() -> None:
|
|||
ending_line=None,
|
||||
)
|
||||
|
||||
|
||||
code_ctx = get_code_optimization_context(function_to_optimize, project_root)
|
||||
read_write_context, read_only_context = code_ctx.read_writable_code, code_ctx.read_only_context_code
|
||||
print(read_only_context.strip())
|
||||
|
||||
expected_read_only_context = """
|
||||
```python:utils.py
|
||||
from transform_utils import DataTransformer
|
||||
|
||||
class DataProcessor:
|
||||
\"\"\"A class for processing data.\"\"\"
|
||||
|
||||
number = 1
|
||||
|
||||
def __repr__(self) -> str:
|
||||
\"\"\"Return a string representation of the DataProcessor.\"\"\"
|
||||
return f"DataProcessor(default_prefix={self.default_prefix!r})"
|
||||
|
||||
def process_data(self, raw_data: str) -> str:
|
||||
\"\"\"Process raw data by converting it to uppercase.\"\"\"
|
||||
return raw_data.upper()
|
||||
|
||||
def transform_data(self, data: str) -> str:
|
||||
\"\"\"Transform the processed data\"\"\"
|
||||
return DataTransformer().transform(data)
|
||||
```"""
|
||||
expected_read_write_context = """
|
||||
import requests
|
||||
from globals import API_URL
|
||||
from utils import DataProcessor
|
||||
import code_to_optimize.code_directories.retriever.main
|
||||
|
||||
def fetch_and_transform_data():
|
||||
# Use the global variable for the request
|
||||
response = requests.get(API_URL)
|
||||
|
||||
raw_data = response.text
|
||||
|
||||
# Use code from another file (utils.py)
|
||||
processor = DataProcessor()
|
||||
processed = processor.process_data(raw_data)
|
||||
transformed = processor.transform_data(processed)
|
||||
|
||||
return transformed
|
||||
|
||||
|
||||
|
||||
def function_to_optimize():
|
||||
return code_to_optimize.code_directories.retriever.main.fetch_and_transform_data()
|
||||
"""
|
||||
assert read_write_context.strip() == expected_read_write_context.strip()
|
||||
assert read_only_context.strip() == expected_read_only_context.strip()
|
||||
|
||||
def test_comfy_module_import() -> None:
|
||||
code = '''
|
||||
import model_management
|
||||
|
||||
class HunyuanVideoClipModel(torch.nn.Module):
|
||||
def __init__(self, dtype_llama=None, device="cpu", dtype=None, model_options={}):
|
||||
super().__init__()
|
||||
dtype_llama = model_management.pick_weight_dtype(dtype_llama, dtype, device)
|
||||
self.clip_l = sd1_clip.SDClipModel(device=device, dtype=dtype, return_projected_pooled=False, model_options=model_options)
|
||||
self.llama = LLAMAModel(device=device, dtype=dtype_llama, model_options=model_options)
|
||||
self.dtypes = set([dtype, dtype_llama])
|
||||
|
||||
def set_clip_options(self, options):
|
||||
self.clip_l.set_clip_options(options)
|
||||
self.llama.set_clip_options(options)
|
||||
|
||||
def reset_clip_options(self):
|
||||
self.clip_l.reset_clip_options()
|
||||
self.llama.reset_clip_options()
|
||||
|
||||
def encode_token_weights(self, token_weight_pairs):
|
||||
token_weight_pairs_l = token_weight_pairs["l"]
|
||||
token_weight_pairs_llama = token_weight_pairs["llama"]
|
||||
|
||||
llama_out, llama_pooled, llama_extra_out = self.llama.encode_token_weights(token_weight_pairs_llama)
|
||||
|
||||
template_end = 0
|
||||
extra_template_end = 0
|
||||
extra_sizes = 0
|
||||
user_end = 9999999999999
|
||||
images = []
|
||||
|
||||
tok_pairs = token_weight_pairs_llama[0]
|
||||
for i, v in enumerate(tok_pairs):
|
||||
elem = v[0]
|
||||
if not torch.is_tensor(elem):
|
||||
if isinstance(elem, numbers.Integral):
|
||||
if elem == 128006:
|
||||
if tok_pairs[i + 1][0] == 882:
|
||||
if tok_pairs[i + 2][0] == 128007:
|
||||
template_end = i + 2
|
||||
user_end = -1
|
||||
if elem == 128009 and user_end == -1:
|
||||
user_end = i + 1
|
||||
else:
|
||||
if elem.get("original_type") == "image":
|
||||
elem_size = elem.get("data").shape[0]
|
||||
if template_end > 0:
|
||||
if user_end == -1:
|
||||
extra_template_end += elem_size - 1
|
||||
else:
|
||||
image_start = i + extra_sizes
|
||||
image_end = i + elem_size + extra_sizes
|
||||
images.append((image_start, image_end, elem.get("image_interleave", 1)))
|
||||
extra_sizes += elem_size - 1
|
||||
|
||||
if llama_out.shape[1] > (template_end + 2):
|
||||
if tok_pairs[template_end + 1][0] == 271:
|
||||
template_end += 2
|
||||
llama_output = llama_out[:, template_end + extra_sizes:user_end + extra_sizes + extra_template_end]
|
||||
llama_extra_out["attention_mask"] = llama_extra_out["attention_mask"][:, template_end + extra_sizes:user_end + extra_sizes + extra_template_end]
|
||||
if llama_extra_out["attention_mask"].sum() == torch.numel(llama_extra_out["attention_mask"]):
|
||||
llama_extra_out.pop("attention_mask") # attention mask is useless if no masked elements
|
||||
|
||||
if len(images) > 0:
|
||||
out = []
|
||||
for i in images:
|
||||
out.append(llama_out[:, i[0]: i[1]: i[2]])
|
||||
llama_output = torch.cat(out + [llama_output], dim=1)
|
||||
|
||||
l_out, l_pooled = self.clip_l.encode_token_weights(token_weight_pairs_l)
|
||||
return llama_output, l_pooled, llama_extra_out
|
||||
|
||||
def load_sd(self, sd):
|
||||
if "text_model.encoder.layers.1.mlp.fc1.weight" in sd:
|
||||
return self.clip_l.load_sd(sd)
|
||||
else:
|
||||
return self.llama.load_sd(sd)
|
||||
'''
|
||||
model_management_code = '''
|
||||
import psutil
|
||||
import logging
|
||||
from enum import Enum
|
||||
from comfy.cli_args import args, PerformanceFeature
|
||||
import torch
|
||||
import sys
|
||||
import platform
|
||||
import weakref
|
||||
import gc
|
||||
|
||||
class VRAMState(Enum):
|
||||
DISABLED = 0 #No vram present: no need to move models to vram
|
||||
NO_VRAM = 1 #Very low vram: enable all the options to save vram
|
||||
LOW_VRAM = 2
|
||||
NORMAL_VRAM = 3
|
||||
HIGH_VRAM = 4
|
||||
SHARED = 5 #No dedicated vram: memory shared between CPU and GPU but models still need to be moved between both.
|
||||
|
||||
class CPUState(Enum):
|
||||
GPU = 0
|
||||
CPU = 1
|
||||
MPS = 2
|
||||
|
||||
# Determine VRAM State
|
||||
vram_state = VRAMState.NORMAL_VRAM
|
||||
set_vram_to = VRAMState.NORMAL_VRAM
|
||||
cpu_state = CPUState.GPU
|
||||
|
||||
total_vram = 0
|
||||
|
||||
def get_supported_float8_types():
|
||||
float8_types = []
|
||||
try:
|
||||
float8_types.append(torch.float8_e4m3fn)
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
float8_types.append(torch.float8_e4m3fnuz)
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
float8_types.append(torch.float8_e5m2)
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
float8_types.append(torch.float8_e5m2fnuz)
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
float8_types.append(torch.float8_e8m0fnu)
|
||||
except:
|
||||
pass
|
||||
return float8_types
|
||||
|
||||
FLOAT8_TYPES = get_supported_float8_types()
|
||||
|
||||
xpu_available = False
|
||||
torch_version = ""
|
||||
try:
|
||||
torch_version = torch.version.__version__
|
||||
temp = torch_version.split(".")
|
||||
torch_version_numeric = (int(temp[0]), int(temp[1]))
|
||||
xpu_available = (torch_version_numeric[0] < 2 or (torch_version_numeric[0] == 2 and torch_version_numeric[1] <= 4)) and torch.xpu.is_available()
|
||||
except:
|
||||
pass
|
||||
|
||||
lowvram_available = True
|
||||
if args.deterministic:
|
||||
logging.info("Using deterministic algorithms for pytorch")
|
||||
torch.use_deterministic_algorithms(True, warn_only=True)
|
||||
|
||||
directml_enabled = False
|
||||
if args.directml is not None:
|
||||
import torch_directml
|
||||
directml_enabled = True
|
||||
device_index = args.directml
|
||||
if device_index < 0:
|
||||
directml_device = torch_directml.device()
|
||||
else:
|
||||
directml_device = torch_directml.device(device_index)
|
||||
logging.info("Using directml with device: {}".format(torch_directml.device_name(device_index)))
|
||||
# torch_directml.disable_tiled_resources(True)
|
||||
lowvram_available = False #TODO: need to find a way to get free memory in directml before this can be enabled by default.
|
||||
|
||||
try:
|
||||
import intel_extension_for_pytorch as ipex
|
||||
_ = torch.xpu.device_count()
|
||||
xpu_available = xpu_available or torch.xpu.is_available()
|
||||
except:
|
||||
xpu_available = xpu_available or (hasattr(torch, "xpu") and torch.xpu.is_available())
|
||||
|
||||
try:
|
||||
if torch.backends.mps.is_available():
|
||||
cpu_state = CPUState.MPS
|
||||
import torch.mps
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
import torch_npu # noqa: F401
|
||||
_ = torch.npu.device_count()
|
||||
npu_available = torch.npu.is_available()
|
||||
except:
|
||||
npu_available = False
|
||||
|
||||
try:
|
||||
import torch_mlu # noqa: F401
|
||||
_ = torch.mlu.device_count()
|
||||
mlu_available = torch.mlu.is_available()
|
||||
except:
|
||||
mlu_available = False
|
||||
|
||||
if args.cpu:
|
||||
cpu_state = CPUState.CPU
|
||||
|
||||
def supports_cast(device, dtype): #TODO
|
||||
if dtype == torch.float32:
|
||||
return True
|
||||
if dtype == torch.float16:
|
||||
return True
|
||||
if directml_enabled: #TODO: test this
|
||||
return False
|
||||
if dtype == torch.bfloat16:
|
||||
return True
|
||||
if is_device_mps(device):
|
||||
return False
|
||||
if dtype == torch.float8_e4m3fn:
|
||||
return True
|
||||
if dtype == torch.float8_e5m2:
|
||||
return True
|
||||
return False
|
||||
|
||||
def pick_weight_dtype(dtype, fallback_dtype, device=None):
|
||||
if dtype is None:
|
||||
dtype = fallback_dtype
|
||||
elif dtype_size(dtype) > dtype_size(fallback_dtype):
|
||||
dtype = fallback_dtype
|
||||
|
||||
if not supports_cast(device, dtype):
|
||||
dtype = fallback_dtype
|
||||
|
||||
return dtype
|
||||
|
||||
|
||||
'''
|
||||
|
||||
# Create a temporary directory instead of a single file
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
# Create a package structure
|
||||
package_dir = Path(temp_dir) / "package"
|
||||
package_dir.mkdir()
|
||||
|
||||
# Create the __init__.py file to make it a proper package
|
||||
with open(package_dir / "__init__.py", "w") as init_file:
|
||||
init_file.write("")
|
||||
|
||||
# Write the model_management.py file
|
||||
with open(package_dir / "model_management.py", "w") as model_file:
|
||||
model_file.write(model_management_code)
|
||||
model_file.flush()
|
||||
|
||||
# Write the main code file that imports from model_management
|
||||
main_file_path = package_dir / "main_module.py"
|
||||
with open(main_file_path, "w") as main_file:
|
||||
main_file.write(code)
|
||||
main_file.flush()
|
||||
|
||||
# Now set up the optimizer with the path to the main file
|
||||
file_path = main_file_path.resolve()
|
||||
opt = Optimizer(
|
||||
Namespace(
|
||||
project_root=package_dir.resolve(),
|
||||
disable_telemetry=True,
|
||||
tests_root="tests",
|
||||
test_framework="pytest",
|
||||
pytest_cmd="pytest",
|
||||
experiment_id=None,
|
||||
test_project_root=Path().resolve(),
|
||||
)
|
||||
)
|
||||
|
||||
function_to_optimize = FunctionToOptimize(
|
||||
function_name="encode_token_weights",
|
||||
file_path=file_path,
|
||||
parents=[FunctionParent(name="HunyuanVideoClipModel", type="ClassDef")],
|
||||
starting_line=None,
|
||||
ending_line=None,
|
||||
)
|
||||
|
||||
code_ctx = get_code_optimization_context(function_to_optimize, opt.args.project_root)
|
||||
read_write_context, read_only_context = code_ctx.read_writable_code, code_ctx.read_only_context_code
|
||||
expected_read_write_context = """
|
||||
import model_management
|
||||
|
||||
class HunyuanVideoClipModel(torch.nn.Module):
|
||||
def __init__(self, dtype_llama=None, device="cpu", dtype=None, model_options={}):
|
||||
super().__init__()
|
||||
dtype_llama = model_management.pick_weight_dtype(dtype_llama, dtype, device)
|
||||
self.clip_l = sd1_clip.SDClipModel(device=device, dtype=dtype, return_projected_pooled=False, model_options=model_options)
|
||||
self.llama = LLAMAModel(device=device, dtype=dtype_llama, model_options=model_options)
|
||||
self.dtypes = set([dtype, dtype_llama])
|
||||
|
||||
def encode_token_weights(self, token_weight_pairs):
|
||||
token_weight_pairs_l = token_weight_pairs["l"]
|
||||
token_weight_pairs_llama = token_weight_pairs["llama"]
|
||||
|
||||
llama_out, llama_pooled, llama_extra_out = self.llama.encode_token_weights(token_weight_pairs_llama)
|
||||
|
||||
template_end = 0
|
||||
extra_template_end = 0
|
||||
extra_sizes = 0
|
||||
user_end = 9999999999999
|
||||
images = []
|
||||
|
||||
tok_pairs = token_weight_pairs_llama[0]
|
||||
for i, v in enumerate(tok_pairs):
|
||||
elem = v[0]
|
||||
if not torch.is_tensor(elem):
|
||||
if isinstance(elem, numbers.Integral):
|
||||
if elem == 128006:
|
||||
if tok_pairs[i + 1][0] == 882:
|
||||
if tok_pairs[i + 2][0] == 128007:
|
||||
template_end = i + 2
|
||||
user_end = -1
|
||||
if elem == 128009 and user_end == -1:
|
||||
user_end = i + 1
|
||||
else:
|
||||
if elem.get("original_type") == "image":
|
||||
elem_size = elem.get("data").shape[0]
|
||||
if template_end > 0:
|
||||
if user_end == -1:
|
||||
extra_template_end += elem_size - 1
|
||||
else:
|
||||
image_start = i + extra_sizes
|
||||
image_end = i + elem_size + extra_sizes
|
||||
images.append((image_start, image_end, elem.get("image_interleave", 1)))
|
||||
extra_sizes += elem_size - 1
|
||||
|
||||
if llama_out.shape[1] > (template_end + 2):
|
||||
if tok_pairs[template_end + 1][0] == 271:
|
||||
template_end += 2
|
||||
llama_output = llama_out[:, template_end + extra_sizes:user_end + extra_sizes + extra_template_end]
|
||||
llama_extra_out["attention_mask"] = llama_extra_out["attention_mask"][:, template_end + extra_sizes:user_end + extra_sizes + extra_template_end]
|
||||
if llama_extra_out["attention_mask"].sum() == torch.numel(llama_extra_out["attention_mask"]):
|
||||
llama_extra_out.pop("attention_mask") # attention mask is useless if no masked elements
|
||||
|
||||
if len(images) > 0:
|
||||
out = []
|
||||
for i in images:
|
||||
out.append(llama_out[:, i[0]: i[1]: i[2]])
|
||||
llama_output = torch.cat(out + [llama_output], dim=1)
|
||||
|
||||
l_out, l_pooled = self.clip_l.encode_token_weights(token_weight_pairs_l)
|
||||
return llama_output, l_pooled, llama_extra_out
|
||||
"""
|
||||
expected_read_only_context = """
|
||||
```python:model_management.py
|
||||
# Determine VRAM State
|
||||
|
||||
|
||||
def pick_weight_dtype(dtype, fallback_dtype, device=None):
|
||||
if dtype is None:
|
||||
dtype = fallback_dtype
|
||||
elif dtype_size(dtype) > dtype_size(fallback_dtype):
|
||||
dtype = fallback_dtype
|
||||
|
||||
if not supports_cast(device, dtype):
|
||||
dtype = fallback_dtype
|
||||
|
||||
return dtype
|
||||
```
|
||||
"""
|
||||
assert read_write_context.strip() == expected_read_write_context.strip()
|
||||
assert read_only_context.strip() == expected_read_only_context.strip()
|
||||
|
|
@ -1,6 +1,14 @@
|
|||
import tempfile
|
||||
from argparse import Namespace
|
||||
from pathlib import Path
|
||||
|
||||
import libcst as cst
|
||||
|
||||
from codeflash.context.code_context_extractor import get_code_optimization_context
|
||||
from codeflash.context.unused_definition_remover import remove_unused_definitions_by_function_names
|
||||
from codeflash.discovery.functions_to_optimize import FunctionToOptimize
|
||||
from codeflash.models.models import FunctionParent
|
||||
from codeflash.optimization.optimizer import Optimizer
|
||||
|
||||
|
||||
def test_variable_removal_only() -> None:
|
||||
|
|
@ -413,4 +421,4 @@ def unused_function():
|
|||
|
||||
qualified_functions = {"get_platform_info", "get_loop_result"}
|
||||
result = remove_unused_definitions_by_function_names(code, qualified_functions)
|
||||
assert result.strip() == expected.strip()
|
||||
assert result.strip() == expected.strip()
|
||||
|
|
|
|||
Loading…
Reference in a new issue